mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-25 07:45:01 +00:00
parent
043b845c06
commit
749bee6d55
@ -65,6 +65,7 @@ func main() {
|
||||
|
||||
hostApi(nil)
|
||||
} else if *tapperMode {
|
||||
rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
|
||||
if *apiServerAddress == "" {
|
||||
panic("API server address must be provided with --api-server-address when using --tap")
|
||||
}
|
||||
@ -77,13 +78,13 @@ func main() {
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
|
||||
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
|
||||
socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
||||
}
|
||||
rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
|
||||
|
||||
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
|
||||
// go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel)
|
||||
} else if *apiServerMode {
|
||||
api.StartResolving(*namespace)
|
||||
|
||||
@ -122,7 +123,7 @@ func loadExtensions() {
|
||||
extensionsMap = make(map[string]*tapApi.Extension)
|
||||
for i, file := range files {
|
||||
filename := file.Name()
|
||||
log.Printf("Loading extension: %s\n", filename)
|
||||
rlog.Infof("Loading extension: %s\n", filename)
|
||||
extension := &tapApi.Extension{
|
||||
Path: path.Join(extensionsDir, filename),
|
||||
}
|
||||
@ -290,7 +291,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
|
||||
for messageData := range messageDataChannel {
|
||||
marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
|
||||
if err != nil {
|
||||
rlog.Infof("error converting message to json %s, (%v,%+v)\n", err, err, err)
|
||||
rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -298,26 +299,8 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
|
||||
// and goes into the intermediate WebSocket.
|
||||
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
|
||||
if err != nil {
|
||||
rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
|
||||
rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) {
|
||||
for outboundLink := range outboundLinkChannel {
|
||||
if outboundLink.SuggestedProtocol == tap.TLSProtocol {
|
||||
marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink)
|
||||
if err != nil {
|
||||
rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
|
||||
if err != nil {
|
||||
rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/romana/rlog"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
@ -65,7 +66,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fmt.Println(matchValue, rule.Value)
|
||||
rlog.Info(matchValue, rule.Value)
|
||||
} else {
|
||||
val := fmt.Sprint(out)
|
||||
matchValue, err = regexp.MatchString(rule.Value, val)
|
||||
|
@ -4,13 +4,12 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Keep it because we might want cookies in the future
|
||||
|
@ -1,8 +1,8 @@
|
||||
package shared
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -11,24 +11,23 @@ const (
|
||||
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
|
||||
)
|
||||
|
||||
func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) {
|
||||
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
|
||||
var err error
|
||||
var connection *websocket.Conn
|
||||
try := 0
|
||||
|
||||
// Connection to server fails if client pod is up before server.
|
||||
// Retries solve this issue.
|
||||
for try < retries {
|
||||
for try < DEFAULT_SOCKET_RETRIES {
|
||||
rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
|
||||
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
|
||||
if err != nil {
|
||||
rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
|
||||
try++
|
||||
if !hideTimeoutErrors {
|
||||
fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
time.Sleep(retrySleepTime)
|
||||
time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
|
||||
}
|
||||
|
||||
if err != nil {
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/romana/rlog"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -18,8 +17,8 @@ import (
|
||||
func StartServer(app *gin.Engine) {
|
||||
signals := make(chan os.Signal, 2)
|
||||
signal.Notify(signals,
|
||||
os.Interrupt, // this catch ctrl + c
|
||||
syscall.SIGTSTP, // this catch ctrl + z
|
||||
os.Interrupt, // this catch ctrl + c
|
||||
syscall.SIGTSTP, // this catch ctrl + z
|
||||
)
|
||||
|
||||
srv := &http.Server{
|
||||
@ -36,8 +35,9 @@ func StartServer(app *gin.Engine) {
|
||||
}()
|
||||
|
||||
// Run server.
|
||||
rlog.Infof("Starting the server...")
|
||||
if err := app.Run(":8899"); err != nil {
|
||||
log.Printf("Oops... Server is not running! Reason: %v", err)
|
||||
rlog.Errorf("Server is not running! Reason: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,15 +54,15 @@ func ReverseSlice(data interface{}) {
|
||||
|
||||
func CheckErr(e error) {
|
||||
if e != nil {
|
||||
log.Printf("%v", e)
|
||||
rlog.Infof("%v", e)
|
||||
//panic(e)
|
||||
}
|
||||
}
|
||||
|
||||
func SetHostname(address, newHostname string) string {
|
||||
replacedUrl, err := url.Parse(address)
|
||||
if err != nil{
|
||||
log.Printf("error replacing hostname to %s in address %s, returning original %v",newHostname, address, err)
|
||||
if err != nil {
|
||||
rlog.Infof("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
|
||||
return address
|
||||
}
|
||||
replacedUrl.Host = newHostname
|
||||
|
@ -1,42 +0,0 @@
|
||||
package mizu
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
core "k8s.io/api/core/v1"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ControlSocket struct {
|
||||
connection *websocket.Conn
|
||||
}
|
||||
|
||||
func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
|
||||
connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return &ControlSocket{connection: connection}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(pods []core.Pod) error {
|
||||
podInfos := make([]shared.PodInfo, 0)
|
||||
for _, pod := range pods {
|
||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
|
||||
}
|
||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
||||
socketMessage := shared.CreateWebSocketStatusMessage(tapStatus)
|
||||
|
||||
jsonMessage, err := json.Marshal(socketMessage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user