From c3e993bbc3c80c6b218adf2143c227f890dc9728 Mon Sep 17 00:00:00 2001 From: RamiBerm Date: Sun, 23 May 2021 17:48:33 +0300 Subject: [PATCH] WIP --- Dockerfile | 10 ++-- api/go.mod | 3 ++ api/main.go | 30 +++++++++++- api/pkg/api/socket_client.go | 60 ------------------------ api/pkg/api/socket_server_handlers.go | 21 ++++++++- api/pkg/controllers/status_controller.go | 12 +++++ api/pkg/routes/public_routes.go | 2 + cli/go.mod | 4 ++ cli/go.sum | 1 + cli/mizu/consts.go | 2 +- cli/mizu/controlSocket.go | 41 ++++++++++++++++ cli/mizu/mizuRunner.go | 37 ++++++++++----- debug.Dockerfile | 10 ++-- shared/go.mod | 7 +++ shared/models.go | 22 +++++++++ shared/socket_client.go | 33 +++++++++++++ 16 files changed, 211 insertions(+), 84 deletions(-) delete mode 100644 api/pkg/api/socket_client.go create mode 100644 api/pkg/controllers/status_controller.go create mode 100644 cli/mizu/controlSocket.go create mode 100644 shared/go.mod create mode 100644 shared/models.go create mode 100644 shared/socket_client.go diff --git a/Dockerfile b/Dockerfile index bc276a398..e85745689 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM node:14-slim AS site-build -WORKDIR /ui-build +WORKDIR /app/ui-build COPY ui . RUN npm i @@ -14,14 +14,16 @@ ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64 RUN apk add libpcap-dev gcc g++ make # Move to api working directory (/api-build). -WORKDIR /api-build +WORKDIR /app/api-build COPY api/go.mod api/go.sum ./ +COPY shared/go.mod shared/go.mod ../shared/ RUN go mod download # cheap trick to make the build faster (As long as go.mod wasn't changes) RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get # Copy and build api code +COPY shared ../shared COPY api . RUN go build -ldflags="-s -w" -o mizuagent . @@ -32,8 +34,8 @@ RUN apk add bash libpcap-dev tcpdump WORKDIR /app # Copy binary and config files from /build to root folder of scratch container. -COPY --from=builder ["/api-build/mizuagent", "."] -COPY --from=site-build ["/ui-build/build", "site"] +COPY --from=builder ["/app/api-build/mizuagent", "."] +COPY --from=site-build ["/app/ui-build/build", "site"] COPY api/start.sh . diff --git a/api/go.mod b/api/go.mod index d0d831ebc..7aa4d41b8 100644 --- a/api/go.mod +++ b/api/go.mod @@ -23,4 +23,7 @@ require ( k8s.io/api v0.21.0 k8s.io/apimachinery v0.21.0 k8s.io/client-go v0.21.0 + github.com/up9inc/mizu/shared v0.0.0 ) + +replace "github.com/up9inc/mizu/shared" v0.0.0 => "../shared" diff --git a/api/main.go b/api/main.go index 2b2ce8aee..7f321355e 100644 --- a/api/main.go +++ b/api/main.go @@ -5,6 +5,8 @@ import ( "flag" "fmt" "github.com/gofiber/fiber/v2" + "github.com/gorilla/websocket" + "github.com/up9inc/mizu/shared" "mizuserver/pkg/api" "mizuserver/pkg/middleware" "mizuserver/pkg/routes" @@ -45,11 +47,11 @@ func main() { } harOutputChannel := tap.StartPassiveTapper() - socketConnection, err := api.ConnectToSocketServer(*aggregatorAddress) + socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) } - go api.PipeChannelToSocket(socketConnection, harOutputChannel) + go pipeChannelToSocket(socketConnection, harOutputChannel) } else if *aggregator { socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) go api.StartReadingEntries(socketHarOutChannel, nil) @@ -94,3 +96,27 @@ func getTapTargets() []string { } return tappedAddressesPerNodeDict[nodeName] } + +func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { + if connection == nil { + panic("Websocket connection is nil") + } + + if messageDataChannel == nil { + panic("Channel of captured messages is nil") + } + + for messageData := range messageDataChannel { + marshaledData, err := json.Marshal(messageData) + if err != nil { + fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err) + continue + } + + err = connection.WriteMessage(websocket.TextMessage, marshaledData) + if err != nil { + fmt.Printf("error sending message through socket server %s, (%v,%+v)\n", err, err, err) + continue + } + } +} diff --git a/api/pkg/api/socket_client.go b/api/pkg/api/socket_client.go deleted file mode 100644 index db522be4c..000000000 --- a/api/pkg/api/socket_client.go +++ /dev/null @@ -1,60 +0,0 @@ -package api - -import ( - "encoding/json" - "fmt" - "github.com/gorilla/websocket" - "mizuserver/pkg/tap" - "time" -) - -func ConnectToSocketServer(address string) (*websocket.Conn, error) { - const maxTry = 3 - const sleepTime = time.Second * 10 - var err error - var connection *websocket.Conn - try := 0 - - // Connection to server fails if client pod is up before server. - // Retries solve this issue. - for try < maxTry { - connection, _, err = websocket.DefaultDialer.Dial(address, nil) - if err != nil { - try++ - fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err) - } else { - break - } - time.Sleep(sleepTime) - } - - if err != nil { - return nil, err - } - - return connection, nil -} - -func PipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { - if connection == nil { - panic("Websocket connection is nil") - } - - if messageDataChannel == nil { - panic("Channel of captured messages is nil") - } - - for messageData := range messageDataChannel { - marshaledData, err := json.Marshal(messageData) - if err != nil { - fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err) - continue - } - - err = connection.WriteMessage(websocket.TextMessage, marshaledData) - if err != nil { - fmt.Printf("error sending message through socket server %s, (%v,%+v)\n", err, err, err) - continue - } - } -} diff --git a/api/pkg/api/socket_server_handlers.go b/api/pkg/api/socket_server_handlers.go index 6855d4292..4a8699e0e 100644 --- a/api/pkg/api/socket_server_handlers.go +++ b/api/pkg/api/socket_server_handlers.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "github.com/antoniodipinto/ikisocket" + "github.com/up9inc/mizu/shared" + "mizuserver/pkg/controllers" "mizuserver/pkg/routes" "mizuserver/pkg/tap" ) @@ -52,6 +54,8 @@ func (h *RoutesEventHandlers) WebSocketError(ep *ikisocket.EventPayload) { } func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) { + fmt.Println("Received socket message") + //TODO: singular flow for unmarshalling messages, tapper messages should use ControlSocketMessage (Which should be renamed) if ep.Kws.GetAttribute("is_tapper") == true && h.SocketHarOutChannel != nil{ var tapOutput tap.OutputChannelItem err := json.Unmarshal(ep.Data, &tapOutput) @@ -61,7 +65,22 @@ func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) { h.SocketHarOutChannel <- &tapOutput } } else { - fmt.Println("Received Web socket message, unable to handle message") + var controlMessage shared.ControlSocketMessage + err := json.Unmarshal(ep.Data, &controlMessage) + if err != nil { + fmt.Printf("Could not unmarshal message received from tapper websocket %v\n", err) + } else { + if controlMessage.MessageType == shared.TAPPING_STATUS_MESSAGE_TYPE { + tappingUpdateData, ok := controlMessage.Data.(*shared.TapStatus) + if ok { + controllers.TapStatus = tappingUpdateData + } else { + fmt.Printf("Could not cast tap status socket message") + } + } else { + fmt.Printf("Received socket message of type %s for which no handlers are defined", controlMessage.MessageType) + } + } } } diff --git a/api/pkg/controllers/status_controller.go b/api/pkg/controllers/status_controller.go new file mode 100644 index 000000000..4eaaba809 --- /dev/null +++ b/api/pkg/controllers/status_controller.go @@ -0,0 +1,12 @@ +package controllers + +import ( + "github.com/gofiber/fiber/v2" + "github.com/up9inc/mizu/shared" +) + +var TapStatus *shared.TapStatus + +func GetTappingStatus(c *fiber.Ctx) error { + return c.Status(fiber.StatusOK).JSON(TapStatus) +} diff --git a/api/pkg/routes/public_routes.go b/api/pkg/routes/public_routes.go index eac2d86ef..d2db661a3 100644 --- a/api/pkg/routes/public_routes.go +++ b/api/pkg/routes/public_routes.go @@ -14,4 +14,6 @@ func EntriesRoutes(fiberApp *fiber.App) { routeGroup.Get("/resetDB", controllers.DeleteAllEntries) // get single (full) entry routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB + + routeGroup.Get("/tapStatus", controllers.GetTappingStatus) // get tapping status } diff --git a/cli/go.mod b/cli/go.mod index c19ade7b4..bac19429d 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -7,4 +7,8 @@ require ( k8s.io/api v0.21.0 k8s.io/apimachinery v0.21.0 k8s.io/client-go v0.21.0 + github.com/gorilla/websocket v1.4.2 + github.com/up9inc/mizu/shared v0.0.0 ) + +replace "github.com/up9inc/mizu/shared" v0.0.0 => "../shared" diff --git a/cli/go.sum b/cli/go.sum index dd1710de5..733186ba9 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -152,6 +152,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I= github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= diff --git a/cli/mizu/consts.go b/cli/mizu/consts.go index b4385daa9..b91547020 100644 --- a/cli/mizu/consts.go +++ b/cli/mizu/consts.go @@ -2,7 +2,7 @@ package mizu var ( Version = "v0.0.1" - Branch = "develop" + Branch = "rami-test" GitCommitHash = "" // this var is overridden using ldflags in makefile when building ) diff --git a/cli/mizu/controlSocket.go b/cli/mizu/controlSocket.go new file mode 100644 index 000000000..59dac1e0f --- /dev/null +++ b/cli/mizu/controlSocket.go @@ -0,0 +1,41 @@ +package mizu + +import ( + "encoding/json" + "github.com/gorilla/websocket" + "github.com/up9inc/mizu/shared" + core "k8s.io/api/core/v1" +) + +type ControlSocket struct { + connection *websocket.Conn +} + +func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) { + connection, err := shared.ConnectToSocketServer(socketServerAddress) + if err != nil { + return nil, err + } else { + return &ControlSocket{connection: connection}, nil + } +} + +func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(namespace string, pods []core.Pod) error { + podInfos := make([]shared.PodInfo, 0) + for _, pod := range pods { + podInfos = append(podInfos, shared.PodInfo{Name: pod.Name}) + } + tapStatus := shared.TapStatus{Namespace: namespace, Pods: podInfos} + socketMessage := shared.ControlSocketMessage{MessageType: shared.TAPPING_STATUS_MESSAGE_TYPE, Data: tapStatus} + + jsonMessage, err := json.Marshal(socketMessage) + if err != nil { + return err + } + err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage) + if err != nil { + return err + } + + return nil +} diff --git a/cli/mizu/mizuRunner.go b/cli/mizu/mizuRunner.go index 3d7525751..fb2d75a96 100644 --- a/cli/mizu/mizuRunner.go +++ b/cli/mizu/mizuRunner.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/kubernetes" + core "k8s.io/api/core/v1" "os" "os/signal" "regexp" @@ -17,7 +18,13 @@ func Run(podRegexQuery *regexp.Regexp) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel will be called when this function exits - nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, podRegexQuery) + matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery) + if err != nil { + fmt.Printf("Error getting pods to tap %v\n", err) + return + } + + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods) if err != nil { cleanUpMizuResources(kubernetesProvider) return @@ -28,6 +35,18 @@ func Run(podRegexQuery *regexp.Regexp) { return } go portForwardApiPod(ctx, kubernetesProvider, cancel) //TODO convert this to job for built in pod ttl or have the running app handle this + + controlSocket, err := CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", config.Configuration.GuiPort)) + if err != nil { + fmt.Printf("error establishing control socket connection %s\n", err) + cancel() + } + time.Sleep(2 * time.Second) + err = controlSocket.SendNewTappedPodsListMessage(kubernetesProvider.Namespace, matchingPods) + if err != nil { + fmt.Printf("error Sending message via control socket %s\n", err) + } + waitForFinish(ctx, cancel) //block until exit signal or error // TODO handle incoming traffic from tapper using a channel @@ -60,13 +79,13 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) { removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second) if err := kubernetesProvider.RemovePod(removalCtx, MizuResourcesNamespace, aggregatorPodName); err != nil { - fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err); + fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err) } if err := kubernetesProvider.RemoveService(removalCtx, MizuResourcesNamespace, aggregatorPodName); err != nil { - fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err); + fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err) } if err := kubernetesProvider.RemoveDaemonSet(removalCtx, MizuResourcesNamespace, TapperDaemonSetName); err != nil { - fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", TapperDaemonSetName, MizuResourcesNamespace, err, err, err); + fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", TapperDaemonSetName, MizuResourcesNamespace, err, err, err) } } @@ -156,13 +175,9 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P return true } -func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, regex *regexp.Regexp) (map[string][]string, error) { - matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, regex) - if err != nil { - return nil, err - } +func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappedPods []core.Pod) (map[string][]string, error) { nodeToTappedPodIPMap := make(map[string][]string, 0) - for _, pod := range matchingPods { + for _, pod := range tappedPods { existingList := nodeToTappedPodIPMap[pod.Spec.NodeName] if existingList == nil { nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP} @@ -185,5 +200,3 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) { cancel() } } - - diff --git a/debug.Dockerfile b/debug.Dockerfile index 2dab3c6f7..a8cc3d159 100644 --- a/debug.Dockerfile +++ b/debug.Dockerfile @@ -1,7 +1,7 @@ # creates image in which mizu api is remotely debuggable using delve FROM node:14-slim AS site-build -WORKDIR /ui-build +WORKDIR /app/ui-build COPY ui . RUN npm i @@ -15,14 +15,16 @@ ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64 RUN apk add libpcap-dev gcc g++ make # Move to api working directory (/api-build). -WORKDIR /api-build +WORKDIR /app/api-build COPY api/go.mod api/go.sum ./ +COPY shared/go.mod shared/go.mod ../shared/ RUN go mod download # cheap trick to make the build faster (As long as go.mod wasn't changes) RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get # Copy and build api code +COPY shared ../shared COPY api . RUN go build -gcflags="all=-N -l" -o mizuagent . @@ -33,8 +35,8 @@ RUN apk add bash libpcap-dev tcpdump WORKDIR /app # Copy binary and config files from /build to root folder of scratch container. -COPY --from=builder ["/api-build/mizuagent", "."] -COPY --from=site-build ["/ui-build/build", "site"] +COPY --from=builder ["/app/api-build/mizuagent", "."] +COPY --from=site-build ["/app/ui-build/build", "site"] # install remote debugging tool RUN go get github.com/go-delve/delve/cmd/dlv diff --git a/shared/go.mod b/shared/go.mod new file mode 100644 index 000000000..b8b1ceac8 --- /dev/null +++ b/shared/go.mod @@ -0,0 +1,7 @@ + module github.com/up9inc/mizu/shared + + go 1.16 + + require ( + github.com/gorilla/websocket v1.4.2 + ) diff --git a/shared/models.go b/shared/models.go new file mode 100644 index 000000000..bda2257dc --- /dev/null +++ b/shared/models.go @@ -0,0 +1,22 @@ +package shared + +type ControlSocketMessageType string + +const ( + TAPPING_STATUS_MESSAGE_TYPE ControlSocketMessageType = "tappingStatus" +) + +type ControlSocketMessage struct { + MessageType ControlSocketMessageType `json:"messageType"` + Data interface{} `json:"data"` +} + +type TapStatus struct { + Namespace string `json:"namespace"` + Pods []PodInfo `json:"pods"` +} + +type PodInfo struct { + Name string `json:"name"` +} + diff --git a/shared/socket_client.go b/shared/socket_client.go new file mode 100644 index 000000000..862ec7625 --- /dev/null +++ b/shared/socket_client.go @@ -0,0 +1,33 @@ +package shared + +import ( + "github.com/gorilla/websocket" + "time" +) + +func ConnectToSocketServer(address string) (*websocket.Conn, error) { + const maxTry = 5 + const sleepTime = time.Second * 2 + var err error + var connection *websocket.Conn + try := 0 + + // Connection to server fails if client pod is up before server. + // Retries solve this issue. + for try < maxTry { + connection, _, err = websocket.DefaultDialer.Dial(address, nil) + if err != nil { + try++ + // fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err) + } else { + break + } + time.Sleep(sleepTime) + } + + if err != nil { + return nil, err + } + + return connection, nil +}