This commit is contained in:
RamiBerm
2021-05-23 17:48:33 +03:00
parent 5cbb5a011e
commit c3e993bbc3
16 changed files with 211 additions and 84 deletions

View File

@@ -1,6 +1,6 @@
FROM node:14-slim AS site-build
WORKDIR /ui-build
WORKDIR /app/ui-build
COPY ui .
RUN npm i
@@ -14,14 +14,16 @@ ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64
RUN apk add libpcap-dev gcc g++ make
# Move to api working directory (/api-build).
WORKDIR /api-build
WORKDIR /app/api-build
COPY api/go.mod api/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/
RUN go mod download
# cheap trick to make the build faster (As long as go.mod wasn't changes)
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get
# Copy and build api code
COPY shared ../shared
COPY api .
RUN go build -ldflags="-s -w" -o mizuagent .
@@ -32,8 +34,8 @@ RUN apk add bash libpcap-dev tcpdump
WORKDIR /app
# Copy binary and config files from /build to root folder of scratch container.
COPY --from=builder ["/api-build/mizuagent", "."]
COPY --from=site-build ["/ui-build/build", "site"]
COPY --from=builder ["/app/api-build/mizuagent", "."]
COPY --from=site-build ["/app/ui-build/build", "site"]
COPY api/start.sh .

View File

@@ -23,4 +23,7 @@ require (
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
github.com/up9inc/mizu/shared v0.0.0
)
replace "github.com/up9inc/mizu/shared" v0.0.0 => "../shared"

View File

@@ -5,6 +5,8 @@ import (
"flag"
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/api"
"mizuserver/pkg/middleware"
"mizuserver/pkg/routes"
@@ -45,11 +47,11 @@ func main() {
}
harOutputChannel := tap.StartPassiveTapper()
socketConnection, err := api.ConnectToSocketServer(*aggregatorAddress)
socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
}
go api.PipeChannelToSocket(socketConnection, harOutputChannel)
go pipeChannelToSocket(socketConnection, harOutputChannel)
} else if *aggregator {
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
go api.StartReadingEntries(socketHarOutChannel, nil)
@@ -94,3 +96,27 @@ func getTapTargets() []string {
}
return tappedAddressesPerNodeDict[nodeName]
}
func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")
}
if messageDataChannel == nil {
panic("Channel of captured messages is nil")
}
for messageData := range messageDataChannel {
marshaledData, err := json.Marshal(messageData)
if err != nil {
fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err)
continue
}
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
fmt.Printf("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
continue
}
}
}

View File

@@ -1,60 +0,0 @@
package api
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"mizuserver/pkg/tap"
"time"
)
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
const maxTry = 3
const sleepTime = time.Second * 10
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < maxTry {
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
try++
fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
} else {
break
}
time.Sleep(sleepTime)
}
if err != nil {
return nil, err
}
return connection, nil
}
func PipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")
}
if messageDataChannel == nil {
panic("Channel of captured messages is nil")
}
for messageData := range messageDataChannel {
marshaledData, err := json.Marshal(messageData)
if err != nil {
fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err)
continue
}
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
fmt.Printf("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
continue
}
}
}

View File

@@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"github.com/antoniodipinto/ikisocket"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/controllers"
"mizuserver/pkg/routes"
"mizuserver/pkg/tap"
)
@@ -52,6 +54,8 @@ func (h *RoutesEventHandlers) WebSocketError(ep *ikisocket.EventPayload) {
}
func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
fmt.Println("Received socket message")
//TODO: singular flow for unmarshalling messages, tapper messages should use ControlSocketMessage (Which should be renamed)
if ep.Kws.GetAttribute("is_tapper") == true && h.SocketHarOutChannel != nil{
var tapOutput tap.OutputChannelItem
err := json.Unmarshal(ep.Data, &tapOutput)
@@ -61,7 +65,22 @@ func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
h.SocketHarOutChannel <- &tapOutput
}
} else {
fmt.Println("Received Web socket message, unable to handle message")
var controlMessage shared.ControlSocketMessage
err := json.Unmarshal(ep.Data, &controlMessage)
if err != nil {
fmt.Printf("Could not unmarshal message received from tapper websocket %v\n", err)
} else {
if controlMessage.MessageType == shared.TAPPING_STATUS_MESSAGE_TYPE {
tappingUpdateData, ok := controlMessage.Data.(*shared.TapStatus)
if ok {
controllers.TapStatus = tappingUpdateData
} else {
fmt.Printf("Could not cast tap status socket message")
}
} else {
fmt.Printf("Received socket message of type %s for which no handlers are defined", controlMessage.MessageType)
}
}
}
}

View File

@@ -0,0 +1,12 @@
package controllers
import (
"github.com/gofiber/fiber/v2"
"github.com/up9inc/mizu/shared"
)
var TapStatus *shared.TapStatus
func GetTappingStatus(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).JSON(TapStatus)
}

View File

@@ -14,4 +14,6 @@ func EntriesRoutes(fiberApp *fiber.App) {
routeGroup.Get("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB
routeGroup.Get("/tapStatus", controllers.GetTappingStatus) // get tapping status
}

View File

@@ -7,4 +7,8 @@ require (
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
github.com/gorilla/websocket v1.4.2
github.com/up9inc/mizu/shared v0.0.0
)
replace "github.com/up9inc/mizu/shared" v0.0.0 => "../shared"

View File

@@ -152,6 +152,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=

View File

@@ -2,7 +2,7 @@ package mizu
var (
Version = "v0.0.1"
Branch = "develop"
Branch = "rami-test"
GitCommitHash = "" // this var is overridden using ldflags in makefile when building
)

41
cli/mizu/controlSocket.go Normal file
View File

@@ -0,0 +1,41 @@
package mizu
import (
"encoding/json"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
)
type ControlSocket struct {
connection *websocket.Conn
}
func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
connection, err := shared.ConnectToSocketServer(socketServerAddress)
if err != nil {
return nil, err
} else {
return &ControlSocket{connection: connection}, nil
}
}
func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(namespace string, pods []core.Pod) error {
podInfos := make([]shared.PodInfo, 0)
for _, pod := range pods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name})
}
tapStatus := shared.TapStatus{Namespace: namespace, Pods: podInfos}
socketMessage := shared.ControlSocketMessage{MessageType: shared.TAPPING_STATUS_MESSAGE_TYPE, Data: tapStatus}
jsonMessage, err := json.Marshal(socketMessage)
if err != nil {
return err
}
err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage)
if err != nil {
return err
}
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
core "k8s.io/api/core/v1"
"os"
"os/signal"
"regexp"
@@ -17,7 +18,13 @@ func Run(podRegexQuery *regexp.Regexp) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, podRegexQuery)
matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery)
if err != nil {
fmt.Printf("Error getting pods to tap %v\n", err)
return
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods)
if err != nil {
cleanUpMizuResources(kubernetesProvider)
return
@@ -28,6 +35,18 @@ func Run(podRegexQuery *regexp.Regexp) {
return
}
go portForwardApiPod(ctx, kubernetesProvider, cancel) //TODO convert this to job for built in pod ttl or have the running app handle this
controlSocket, err := CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", config.Configuration.GuiPort))
if err != nil {
fmt.Printf("error establishing control socket connection %s\n", err)
cancel()
}
time.Sleep(2 * time.Second)
err = controlSocket.SendNewTappedPodsListMessage(kubernetesProvider.Namespace, matchingPods)
if err != nil {
fmt.Printf("error Sending message via control socket %s\n", err)
}
waitForFinish(ctx, cancel) //block until exit signal or error
// TODO handle incoming traffic from tapper using a channel
@@ -60,13 +79,13 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
if err := kubernetesProvider.RemovePod(removalCtx, MizuResourcesNamespace, aggregatorPodName); err != nil {
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err);
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err)
}
if err := kubernetesProvider.RemoveService(removalCtx, MizuResourcesNamespace, aggregatorPodName); err != nil {
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err);
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err)
}
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, MizuResourcesNamespace, TapperDaemonSetName); err != nil {
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", TapperDaemonSetName, MizuResourcesNamespace, err, err, err);
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", TapperDaemonSetName, MizuResourcesNamespace, err, err, err)
}
}
@@ -156,13 +175,9 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
return true
}
func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, regex *regexp.Regexp) (map[string][]string, error) {
matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, regex)
if err != nil {
return nil, err
}
func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappedPods []core.Pod) (map[string][]string, error) {
nodeToTappedPodIPMap := make(map[string][]string, 0)
for _, pod := range matchingPods {
for _, pod := range tappedPods {
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP}
@@ -185,5 +200,3 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
cancel()
}
}

View File

@@ -1,7 +1,7 @@
# creates image in which mizu api is remotely debuggable using delve
FROM node:14-slim AS site-build
WORKDIR /ui-build
WORKDIR /app/ui-build
COPY ui .
RUN npm i
@@ -15,14 +15,16 @@ ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64
RUN apk add libpcap-dev gcc g++ make
# Move to api working directory (/api-build).
WORKDIR /api-build
WORKDIR /app/api-build
COPY api/go.mod api/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/
RUN go mod download
# cheap trick to make the build faster (As long as go.mod wasn't changes)
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get
# Copy and build api code
COPY shared ../shared
COPY api .
RUN go build -gcflags="all=-N -l" -o mizuagent .
@@ -33,8 +35,8 @@ RUN apk add bash libpcap-dev tcpdump
WORKDIR /app
# Copy binary and config files from /build to root folder of scratch container.
COPY --from=builder ["/api-build/mizuagent", "."]
COPY --from=site-build ["/ui-build/build", "site"]
COPY --from=builder ["/app/api-build/mizuagent", "."]
COPY --from=site-build ["/app/ui-build/build", "site"]
# install remote debugging tool
RUN go get github.com/go-delve/delve/cmd/dlv

7
shared/go.mod Normal file
View File

@@ -0,0 +1,7 @@
module github.com/up9inc/mizu/shared
go 1.16
require (
github.com/gorilla/websocket v1.4.2
)

22
shared/models.go Normal file
View File

@@ -0,0 +1,22 @@
package shared
type ControlSocketMessageType string
const (
TAPPING_STATUS_MESSAGE_TYPE ControlSocketMessageType = "tappingStatus"
)
type ControlSocketMessage struct {
MessageType ControlSocketMessageType `json:"messageType"`
Data interface{} `json:"data"`
}
type TapStatus struct {
Namespace string `json:"namespace"`
Pods []PodInfo `json:"pods"`
}
type PodInfo struct {
Name string `json:"name"`
}

33
shared/socket_client.go Normal file
View File

@@ -0,0 +1,33 @@
package shared
import (
"github.com/gorilla/websocket"
"time"
)
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
const maxTry = 5
const sleepTime = time.Second * 2
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < maxTry {
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
try++
// fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
} else {
break
}
time.Sleep(sleepTime)
}
if err != nil {
return nil, err
}
return connection, nil
}