diff --git a/Dockerfile b/Dockerfile index 646956a64..af7dd4ca1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,6 +53,7 @@ WORKDIR /app COPY --from=builder ["/app/agent-build/mizuagent", "."] COPY --from=builder ["/app/agent/build/extensions", "extensions"] COPY --from=site-build ["/app/ui-build/build", "site"] +RUN mkdir /app/data/ # gin-gonic runs in debug mode without this ENV GIN_MODE=release diff --git a/acceptanceTests/testsUtils.go b/acceptanceTests/testsUtils.go index ddf49d982..9c7ad955a 100644 --- a/acceptanceTests/testsUtils.go +++ b/acceptanceTests/testsUtils.go @@ -17,12 +17,13 @@ import ( ) const ( - longRetriesCount = 100 - shortRetriesCount = 10 - defaultApiServerPort = shared.DefaultApiServerPort - defaultNamespaceName = "mizu-tests" - defaultServiceName = "httpbin" - defaultEntriesCount = 50 + longRetriesCount = 100 + shortRetriesCount = 10 + defaultApiServerPort = shared.DefaultApiServerPort + defaultNamespaceName = "mizu-tests" + defaultServiceName = "httpbin" + defaultEntriesCount = 50 + waitAfterTapPodsReady = 3 * time.Second ) func getCliPath() (string, error) { @@ -141,7 +142,7 @@ func waitTapPodsReady(apiServerUrl string) error { if tappersCount == 0 { return fmt.Errorf("no tappers running") } - + time.Sleep(waitAfterTapPodsReady) return nil } diff --git a/agent/go.sum b/agent/go.sum index ca5ace3f1..3994ea223 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -103,6 +103,7 @@ github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4= github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= @@ -229,6 +230,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -277,6 +279,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -306,6 +309,7 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -314,6 +318,7 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -393,6 +398,7 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -417,6 +423,7 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -825,7 +832,9 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= +k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 h1:vEx13qjvaZ4yfObSSXW7BrMc/KQBBT/Jyee8XtLf4x0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= +k8s.io/kubectl v0.21.2 h1:9XPCetvOMDqrIZZXb1Ei+g8t6KrIp9ENJaysQjUuLiE= k8s.io/kubectl v0.21.2/go.mod h1:PgeUclpG8VVmmQIl8zpLar3IQEpFc9mrmvlwY3CK1xo= k8s.io/metrics v0.21.2/go.mod h1:wzlOINZMCtWq8dR9gHlyaOemmYlOpAoldEIXE82gAhI= k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw= diff --git a/agent/main.go b/agent/main.go index 00e7d55d8..51ffc9a41 100644 --- a/agent/main.go +++ b/agent/main.go @@ -1,14 +1,20 @@ package main import ( + "context" "encoding/json" + "errors" "flag" "fmt" + "github.com/up9inc/mizu/shared/kubernetes" "io/ioutil" + v1 "k8s.io/api/core/v1" "mizuserver/pkg/api" "mizuserver/pkg/config" "mizuserver/pkg/controllers" + "mizuserver/pkg/database" "mizuserver/pkg/models" + "mizuserver/pkg/providers" "mizuserver/pkg/routes" "mizuserver/pkg/up9" "mizuserver/pkg/utils" @@ -19,6 +25,7 @@ import ( "path/filepath" "plugin" "sort" + "syscall" "time" "github.com/gin-contrib/static" @@ -45,6 +52,7 @@ var extensionsMap map[string]*tapApi.Extension // global const ( socketConnectionRetries = 10 socketConnectionRetryDelay = time.Second * 2 + socketHandshakeTimeout = time.Second * 2 ) func main() { @@ -101,6 +109,7 @@ func main() { go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel) } else if *apiServerMode { + database.InitDataBase(config.Config.AgentDatabasePath) api.StartResolving(*namespace) outputItemsChannel := make(chan *tapApi.OutputChannelItem) @@ -197,6 +206,15 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { routes.StatusRoutes(app) routes.NotFoundRoute(app) + if config.Config.DaemonMode { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := startMizuTapperSyncer(ctx); err != nil { + logger.Log.Fatalf("error initializing tapper syncer: %+v", err) + } + } + utils.StartServer(app) } @@ -296,6 +314,15 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha err = connection.WriteMessage(websocket.TextMessage, marshaledData) if err != nil { logger.Log.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err) + if errors.Is(err, syscall.EPIPE) { + logger.Log.Warning("detected socket disconnection, reestablishing socket connection") + connection, err = dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay) + if err != nil { + logger.Log.Fatalf("error reestablishing socket connection: %v", err) + } else { + logger.Log.Info("recovered connection successfully") + } + } continue } } @@ -326,17 +353,77 @@ func determineLogLevel() (logLevel logging.Level) { func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time.Duration) (*websocket.Conn, error) { var lastErr error + dialer := &websocket.Dialer{ // we use our own dialer instead of the default due to the default's 45 sec handshake timeout, we occasionally encounter hanging socket handshakes when tapper tries to connect to api too soon + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: socketHandshakeTimeout, + } for i := 1; i < retryAmount; i++ { - socketConnection, _, err := websocket.DefaultDialer.Dial(socketAddress, nil) + socketConnection, _, err := dialer.Dial(socketAddress, nil) if err != nil { if i < retryAmount { - logger.Log.Debugf("socket connection to %s failed: %v, retrying %d out of %d in %d seconds...", socketAddress, err, i, retryAmount, retryDelay / time.Second) + logger.Log.Infof("socket connection to %s failed: %v, retrying %d out of %d in %d seconds...", socketAddress, err, i, retryAmount, retryDelay / time.Second) time.Sleep(retryDelay) } } else { - logger.Log.Debugf("socket connection to %s successful", socketAddress) return socketConnection, nil } } return nil, lastErr -} \ No newline at end of file +} + + +func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, error){ + provider, err := kubernetes.NewProviderInCluster() + if err != nil { + return nil, err + } + + tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ + TargetNamespaces: config.Config.TargetNamespaces, + PodFilterRegex: config.Config.TapTargetRegex.Regexp, + MizuResourcesNamespace: config.Config.MizuResourcesNamespace, + AgentImage: config.Config.AgentImage, + TapperResources: config.Config.TapperResources, + ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy), + DumpLogs: config.Config.DumpLogs, + IgnoredUserAgents: config.Config.IgnoredUserAgents, + MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions, + MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway + }) + + if err != nil { + return nil, err + } + + // handle tapperSyncer events (pod changes and errors) + go func() { + for { + select { + case syncerErr, ok := <-tapperSyncer.ErrorOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop") + return + } + logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) + case _, ok := <-tapperSyncer.TapPodChangesOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") + return + } + tapStatus := shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} + + serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tapStatus)) + if err != nil { + logger.Log.Fatalf("error serializing tap status: %v", err) + } + api.BroadcastToBrowserClients(serializedTapStatus) + providers.TapStatus.Pods = tapStatus.Pods + case <-ctx.Done(): + logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") + return + } + } + }() + + return tapperSyncer, nil +} diff --git a/agent/pkg/config/config.go b/agent/pkg/config/config.go index 0104a52b1..ac002e0c0 100644 --- a/agent/pkg/config/config.go +++ b/agent/pkg/config/config.go @@ -13,6 +13,7 @@ import ( const ( defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000 defaultRegexTarget string = ".*" + DefaultDatabasePath string = "./entries" ) var Config *shared.MizuAgentConfig @@ -52,7 +53,9 @@ func getDefaultConfig() (*shared.MizuAgentConfig, error) { return nil, err } return &shared.MizuAgentConfig{ - TapTargetRegex: *regex, - MaxDBSizeBytes: defaultMaxDatabaseSizeBytes, + TapTargetRegex: *regex, + MaxDBSizeBytes: defaultMaxDatabaseSizeBytes, + AgentDatabasePath: DefaultDatabasePath, + DaemonMode: false, }, nil } diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 16c8d33ee..fab3bb690 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -14,6 +14,15 @@ import ( "github.com/up9inc/mizu/shared/logger" ) +func HealthCheck(c *gin.Context) { + response := shared.HealthResponse{ + TapStatus: providers.TapStatus, + TappersCount: providers.TappersCount, + } + c.JSON(http.StatusOK, response) +} + + func PostTappedPods(c *gin.Context) { tapStatus := &shared.TapStatus{} if err := c.Bind(tapStatus); err != nil { diff --git a/agent/pkg/database/main.go b/agent/pkg/database/main.go index befc4bb92..6d199823c 100644 --- a/agent/pkg/database/main.go +++ b/agent/pkg/database/main.go @@ -13,7 +13,6 @@ import ( ) const ( - DBPath = "./entries.db" OrderDesc = "desc" OrderAsc = "asc" LT = "lt" @@ -34,10 +33,7 @@ var ( } ) -func init() { - DB = initDataBase(DBPath) - go StartEnforcingDatabaseSize() -} +var DBPath string func GetEntriesTable() *gorm.DB { return DB.Table("mizu_entries") @@ -50,12 +46,14 @@ func CreateEntry(entry *tapApi.MizuEntry) { GetEntriesTable().Create(entry) } -func initDataBase(databasePath string) *gorm.DB { - temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{ +func InitDataBase(databasePath string) *gorm.DB { + DBPath = databasePath + DB, _ = gorm.Open(sqlite.Open(databasePath), &gorm.Config{ Logger: &utils.TruncatingLogger{LogLevel: logger.Warn, SlowThreshold: 500 * time.Millisecond}, }) - _ = temp.AutoMigrate(&tapApi.MizuEntry{}) // this will ensure table is created - return temp + _ = DB.AutoMigrate(&tapApi.MizuEntry{}) // this will ensure table is created + go StartEnforcingDatabaseSize() + return DB } func GetEntriesFromDb(timeFrom time.Time, timeTo time.Time, protocolName *string) []tapApi.MizuEntry { diff --git a/agent/pkg/providers/stats_provider_test.go b/agent/pkg/providers/stats_provider_test.go index 13acfece9..4e0d3ff62 100644 --- a/agent/pkg/providers/stats_provider_test.go +++ b/agent/pkg/providers/stats_provider_test.go @@ -2,10 +2,16 @@ package providers_test import ( "fmt" + "mizuserver/pkg/config" + "mizuserver/pkg/database" "mizuserver/pkg/providers" "testing" ) +func init() { + database.InitDataBase(config.DefaultDatabasePath) +} + func TestNoEntryAddedCount(t *testing.T) { entriesStats := providers.GetGeneralStats() diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index 54d623aae..505e0b5f5 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -8,6 +8,8 @@ import ( func StatusRoutes(ginApp *gin.Engine) { routeGroup := ginApp.Group("/status") + routeGroup.GET("/health", controllers.HealthCheck) + routeGroup.POST("/tappedPods", controllers.PostTappedPods) routeGroup.GET("/tappersCount", controllers.GetTappersCount) routeGroup.GET("/tap", controllers.GetTappingStatus) diff --git a/cli/apiserver/provider.go b/cli/apiserver/provider.go index 09f0c2014..71f1a4cf3 100644 --- a/cli/apiserver/provider.go +++ b/cli/apiserver/provider.go @@ -3,7 +3,9 @@ package apiserver import ( "bytes" "encoding/json" + "errors" "fmt" + "github.com/up9inc/mizu/shared/kubernetes" "io/ioutil" "net/http" "net/url" @@ -15,30 +17,30 @@ import ( core "k8s.io/api/core/v1" ) -type apiServerProvider struct { +type Provider struct { url string - isReady bool retries int + client *http.Client } -var Provider = apiServerProvider{retries: config.GetIntEnvConfig(config.ApiServerRetries, 20)} +const DefaultRetries = 20 +const DefaultTimeout = 5 * time.Second -func (provider *apiServerProvider) InitAndTestConnection(url string) error { - healthUrl := fmt.Sprintf("%s/", url) +func NewProvider(url string, retries int, timeout time.Duration) *Provider { + return &Provider{ + url: url, + retries: config.GetIntEnvConfig(config.ApiServerRetries, retries), + client: &http.Client{ + Timeout: timeout, + }, + } +} + +func (provider *Provider) TestConnection() error { retriesLeft := provider.retries for retriesLeft > 0 { - if response, err := http.Get(healthUrl); err != nil { - logger.Log.Debugf("[ERROR] failed connecting to api server %v", err) - } else if response.StatusCode != 200 { - responseBody := "" - data, readErr := ioutil.ReadAll(response.Body) - if readErr == nil { - responseBody = string(data) - } - - logger.Log.Debugf("can't connect to api server yet, response status code: %v, body: %v", response.StatusCode, responseBody) - - response.Body.Close() + if _, err := provider.GetHealthStatus(); err != nil { + logger.Log.Debugf("[ERROR] api server not ready yet %v", err) } else { logger.Log.Debugf("connection test to api server passed successfully") break @@ -48,30 +50,38 @@ func (provider *apiServerProvider) InitAndTestConnection(url string) error { } if retriesLeft == 0 { - provider.isReady = false return fmt.Errorf("couldn't reach the api server after %v retries", provider.retries) } - provider.url = url - provider.isReady = true return nil } -func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error { - if !provider.isReady { - return fmt.Errorf("trying to reach api server when not initialized yet") +func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) { + healthUrl := fmt.Sprintf("%s/status/health", provider.url) + if response, err := provider.client.Get(healthUrl); err != nil { + return nil, err + } else if response.StatusCode > 299 { + return nil, errors.New(fmt.Sprintf("status code: %d", response.StatusCode)) + } else { + defer response.Body.Close() + + healthResponse := &shared.HealthResponse{} + if err := json.NewDecoder(response.Body).Decode(&healthResponse); err != nil { + return nil, err + } + return healthResponse, nil } +} + +func (provider *Provider) ReportTappedPods(pods []core.Pod) error { tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url) - podInfos := make([]shared.PodInfo, 0) - for _, pod := range pods { - podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace}) - } + podInfos := kubernetes.GetPodInfosForPods(pods) tapStatus := shared.TapStatus{Pods: podInfos} if jsonValue, err := json.Marshal(tapStatus); err != nil { return fmt.Errorf("failed Marshal the tapped pods %w", err) } else { - if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { + if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { return fmt.Errorf("failed sending to API server the tapped pods %w", err) } else if response.StatusCode != 200 { return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode) @@ -82,20 +92,17 @@ func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error { } } -func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, error) { - if !provider.isReady { - return nil, fmt.Errorf("trying to reach api server when not initialized yet") - } +func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) { generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url) - response, requestErr := http.Get(generalStatsUrl) + response, requestErr := provider.client.Get(generalStatsUrl) if requestErr != nil { return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr) } else if response.StatusCode != 200 { return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode) } - defer func() { _ = response.Body.Close() }() + defer response.Body.Close() data, readErr := ioutil.ReadAll(response.Body) if readErr != nil { @@ -109,16 +116,13 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er return generalStats, nil } -func (provider *apiServerProvider) GetVersion() (string, error) { - if !provider.isReady { - return "", fmt.Errorf("trying to reach api server when not initialized yet") - } +func (provider *Provider) GetVersion() (string, error) { versionUrl, _ := url.Parse(fmt.Sprintf("%s/metadata/version", provider.url)) req := &http.Request{ Method: http.MethodGet, URL: versionUrl, } - statusResp, err := http.DefaultClient.Do(req) + statusResp, err := provider.client.Do(req) if err != nil { return "", err } diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 38adef8ad..f4ed44605 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -112,4 +112,5 @@ func init() { tapCmd.Flags().StringP(configStructs.WorkspaceTapName, "w", defaultTapConfig.Workspace, "Uploads traffic to your UP9 workspace for further analysis (requires auth)") tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules") tapCmd.Flags().String(configStructs.ContractFile, defaultTapConfig.ContractFile, "OAS/Swagger file to validate to monitor the contracts") + tapCmd.Flags().Bool(configStructs.DaemonModeTapName, defaultTapConfig.DaemonMode, "Run mizu in daemon mode, detached from the cli") } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index e03430645..988b3ca58 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -8,20 +8,19 @@ import ( "io/ioutil" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "path" "regexp" "strings" "time" - "gopkg.in/yaml.v3" - core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" - "github.com/getkin/kin-openapi/openapi3" "github.com/up9inc/mizu/cli/apiserver" "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/errormessage" + "gopkg.in/yaml.v3" + core "k8s.io/api/core/v1" "github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/mizu/fsUtils" @@ -42,9 +41,11 @@ type tapState struct { } var state tapState +var apiProvider *apiserver.Provider func RunMizuTap() { mizuApiFilteringOptions, err := getMizuApiFilteringOptions() + apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout) if err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err))) return @@ -83,12 +84,6 @@ func RunMizuTap() { } } - serializedMizuConfig, err := config.GetSerializedMizuConfig() - if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err))) - return - } - kubernetesProvider, err := getKubernetesProviderForCli() if err != nil { return @@ -99,6 +94,12 @@ func RunMizuTap() { targetNamespaces := getNamespaces(kubernetesProvider) + serializedMizuConfig, err := config.GetSerializedMizuAgentConfig(targetNamespaces, mizuApiFilteringOptions) + if err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err))) + return + } + if config.Config.IsNsRestrictedMode() { if len(targetNamespaces) != 1 || !shared.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) { logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+ @@ -120,7 +121,7 @@ func RunMizuTap() { return } - if err := createMizuResources(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil { + if err := createMizuResources(ctx, cancel, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err))) var statusError *k8serrors.StatusError @@ -131,21 +132,66 @@ func RunMizuTap() { } return } - defer finishMizuExecution(kubernetesProvider) + if config.Config.Tap.DaemonMode { + if err := handleDaemonModePostCreation(cancel, kubernetesProvider); err != nil { + defer finishMizuExecution(kubernetesProvider) + cancel() + } else { + logger.Log.Infof(uiUtils.Magenta, "Mizu is now running in daemon mode, run `mizu view` to connect to the mizu daemon instance") + } + } else { + defer finishMizuExecution(kubernetesProvider) - if err = startTapManager(ctx, cancel, kubernetesProvider, targetNamespaces, *mizuApiFilteringOptions); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", err)) - cancel() + if err = startTapperSyncer(ctx, cancel, kubernetesProvider, targetNamespaces, *mizuApiFilteringOptions); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err)) + cancel() + } + + go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) + go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel) + + // block until exit signal or error + waitForFinish(ctx, cancel) } - - go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions) - go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel) - - // block until exit signal or error - waitForFinish(ctx, cancel) } -func startTapManager(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, mizuApiFilteringOptions api.TrafficFilteringOptions) error { +func handleDaemonModePostCreation(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) error { + apiProvider := apiserver.NewProvider(GetApiServerUrl(), 90, 1*time.Second) + + if err := waitForDaemonModeToBeReady(cancel, kubernetesProvider, apiProvider); err != nil { + return err + } + if err := printDaemonModeTappedPods(apiProvider); err != nil { + return err + } + + return nil +} + +func printDaemonModeTappedPods(apiProvider *apiserver.Provider) error { + if healthStatus, err := apiProvider.GetHealthStatus(); err != nil { + return err + } else { + for _, tappedPod := range healthStatus.TapStatus.Pods { + logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name)) + } + } + return nil +} + +func waitForDaemonModeToBeReady(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) error { + logger.Log.Info("Waiting for mizu to be ready... (may take a few minutes)") + go startProxyReportErrorIfAny(kubernetesProvider, cancel) + + // TODO: TRA-3903 add a smarter test to see that tapping/pod watching is functioning properly + if err := apiProvider.TestConnection(); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu was not ready in time, for more info check logs at %s", fsUtils.GetLogFilePath())) + return err + } + return nil +} + +func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, mizuApiFilteringOptions api.TrafficFilteringOptions) error { tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ TargetNamespaces: targetNamespaces, PodFilterRegex: *config.Config.Tap.PodRegex(), @@ -178,11 +224,19 @@ func startTapManager(ctx context.Context, cancel context.CancelFunc, provider *k go func() { for { select { - case managerErr := <-tapperSyncer.ErrorOut: - logger.Log.Errorf(uiUtils.Error, getErrorDisplayTextForK8sTapManagerError(managerErr)) + case syncerErr, ok := <-tapperSyncer.ErrorOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop") + return + } + logger.Log.Errorf(uiUtils.Error, getErrorDisplayTextForK8sTapManagerError(syncerErr)) cancel() - case <-tapperSyncer.TapPodChangesOut: - if err := apiserver.Provider.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil { + case _, ok := <-tapperSyncer.TapPodChangesOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") + return + } + if err := apiProvider.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil { logger.Log.Debugf("[Error] failed update tapped pods %v", err) } case <-ctx.Done(): @@ -219,40 +273,21 @@ func readValidationRules(file string) (string, error) { return string(newContent), nil } -func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error { +func createMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error { if !config.Config.IsNsRestrictedMode() { if err := createMizuNamespace(ctx, kubernetesProvider); err != nil { return err } } - if err := createMizuApiServer(ctx, kubernetesProvider); err != nil { - return err - } - if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil { logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err))) } - return nil -} - -func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error { - err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName, serializedValidationRules, serializedContract, serializedMizuConfig) - return err -} - -func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error { - _, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace) - return err -} - -func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider) error { var err error - state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider) if err != nil { - logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to ensure the resources required for IP resolving. Mizu will not resolve target IPs to names. error: %v", errormessage.FormatError(err))) + return err } var serviceAccountName string @@ -273,11 +308,20 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro Resources: config.Config.Tap.ApiServerResources, ImagePullPolicy: config.Config.ImagePullPolicy(), } - _, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts) - if err != nil { - return err + + if config.Config.Tap.DaemonMode { + if !state.mizuServiceAccountExists { + defer cleanUpMizuResources(ctx, cancel, kubernetesProvider) + logger.Log.Fatalf(uiUtils.Red, fmt.Sprintf("Failed to ensure the resources required for mizu to run in daemon mode. cannot proceed. error: %v", errormessage.FormatError(err))) + } + if err := createMizuApiServerDeployment(ctx, kubernetesProvider, opts); err != nil { + return err + } + } else { + if err := createMizuApiServerPod(ctx, kubernetesProvider, opts); err != nil { + return err + } } - logger.Log.Debugf("Successfully created API server pod: %s", kubernetes.ApiServerPodName) state.apiServerService, err = kubernetesProvider.CreateService(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, kubernetes.ApiServerPodName) if err != nil { @@ -288,6 +332,57 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro return nil } +func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error { + err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName, serializedValidationRules, serializedContract, serializedMizuConfig) + return err +} + +func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error { + _, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace) + return err +} + +func createMizuApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error { + pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, false, "") + if err != nil { + return err + } + if _, err = kubernetesProvider.CreatePod(ctx, config.Config.MizuResourcesNamespace, pod); err != nil { + return err + } + logger.Log.Debugf("Successfully created API server pod: %s", kubernetes.ApiServerPodName) + return nil +} + +func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error { + isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx) + volumeClaimCreated := false + if err != nil { + return err + } + if isDefaultStorageClassAvailable { + if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil { + logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this will mean that mizu's data will be lost on pod restart") + logger.Log.Debugf("error creating persistent volume claim: %v", err) + } else { + volumeClaimCreated = true + } + } else { + logger.Log.Warningf(uiUtils.Yellow, "Could not find default volume provider in this cluster, this will mean that mizu's data will be lost on pod restart") + } + + pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, volumeClaimCreated, kubernetes.PersistentVolumeClaimName) + if err != nil { + return err + } + + if _, err = kubernetesProvider.CreateDeployment(ctx, config.Config.MizuResourcesNamespace, opts.PodName, pod); err != nil { + return err + } + logger.Log.Debugf("Successfully created API server deployment: %s", kubernetes.ApiServerPodName) + return nil +} + func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) { var compiledRegexSlice []*api.SerializableRegexp @@ -323,7 +418,7 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig { } func finishMizuExecution(kubernetesProvider *kubernetes.Provider) { - telemetry.ReportAPICalls() + telemetry.ReportAPICalls(apiProvider) removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) defer cancel() dumpLogsIfNeeded(removalCtx, kubernetesProvider) @@ -364,11 +459,6 @@ func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubern func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string { leftoverResources := make([]string, 0) - if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil { - resourceDesc := fmt.Sprintf("Pod %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace) - handleDeletionError(err, resourceDesc, &leftoverResources) - } - if err := kubernetesProvider.RemoveService(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil { resourceDesc := fmt.Sprintf("Service %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) @@ -394,11 +484,37 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P handleDeletionError(err, resourceDesc, &leftoverResources) } + if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil { + resourceDesc := fmt.Sprintf("Pod %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + //daemon mode resources if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName); err != nil { resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.RoleBindingName, config.Config.MizuResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) } + if err := kubernetesProvider.RemoveDeployment(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil { + resourceDesc := fmt.Sprintf("Deployment %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemovePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName); err != nil { + resourceDesc := fmt.Sprintf("PersistentVolumeClaim %s in namespace %s", kubernetes.PersistentVolumeClaimName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, kubernetes.DaemonRoleName); err != nil { + resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.DaemonRoleName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, kubernetes.DaemonRoleBindingName); err != nil { + resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.DaemonRoleBindingName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + return leftoverResources } @@ -448,7 +564,7 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k } } -func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) { +func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) isPodReady := false @@ -500,7 +616,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi go startProxyReportErrorIfAny(kubernetesProvider, cancel) url := GetApiServerUrl() - if err := apiserver.Provider.InitAndTestConnection(url); err != nil { + if err := apiProvider.TestConnection(); err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) cancel() break @@ -508,7 +624,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi logger.Log.Infof("Mizu is available at %s\n", url) uiUtils.OpenBrowser(url) - if err := apiserver.Provider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil { + if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil { logger.Log.Debugf("[Error] failed update tapped pods %v", err) } } @@ -599,27 +715,34 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider } } -func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) { - if !config.Config.IsNsRestrictedMode() { - err := kubernetesProvider.CreateMizuRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.ClusterRoleName, kubernetes.ClusterRoleBindingName, mizu.RBACVersion) - if err != nil { - return false, err - } - } else { - err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.RoleName, kubernetes.RoleBindingName, mizu.RBACVersion) - if err != nil { - return false, err - } - } - return true, nil -} - func getNamespaces(kubernetesProvider *kubernetes.Provider) []string { if config.Config.Tap.AllNamespaces { return []string{kubernetes.K8sAllNamespaces} } else if len(config.Config.Tap.Namespaces) > 0 { return shared.Unique(config.Config.Tap.Namespaces) } else { - return []string{kubernetesProvider.CurrentNamespace()} + currentNamespace, err := kubernetesProvider.CurrentNamespace() + if err != nil { + logger.Log.Fatalf(uiUtils.Red, fmt.Sprintf("error getting current namespace: %+v", err)) + } + return []string{currentNamespace} } } + +func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) { + if !config.Config.IsNsRestrictedMode() { + if err := kubernetesProvider.CreateMizuRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.ClusterRoleName, kubernetes.ClusterRoleBindingName, mizu.RBACVersion); err != nil { + return false, err + } + } else { + if err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.RoleName, kubernetes.RoleBindingName, mizu.RBACVersion); err != nil { + return false, err + } + } + if config.Config.Tap.DaemonMode { + if err := kubernetesProvider.CreateDaemonsetRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.DaemonRoleName, kubernetes.DaemonRoleBindingName, mizu.RBACVersion); err != nil { + return false, err + } + } + return true, nil +} diff --git a/cli/cmd/viewRunner.go b/cli/cmd/viewRunner.go index c05690d20..a342b484c 100644 --- a/cli/cmd/viewRunner.go +++ b/cli/cmd/viewRunner.go @@ -48,17 +48,19 @@ func runMizuView() { logger.Log.Infof("Establishing connection to k8s cluster...") go startProxyReportErrorIfAny(kubernetesProvider, cancel) - if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) - return - } + } + + apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout) + if err := apiServerProvider.TestConnection(); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) + return } logger.Log.Infof("Mizu is available at %s\n", url) uiUtils.OpenBrowser(url) - if isCompatible, err := version.CheckVersionCompatibility(); err != nil { + if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil { logger.Log.Errorf("Failed to check versions compatibility %v", err) cancel() return diff --git a/cli/config/config.go b/cli/config/config.go index 753b7f429..386f4816a 100644 --- a/cli/config/config.go +++ b/cli/config/config.go @@ -367,8 +367,8 @@ func setZeroForReadonlyFields(currentElem reflect.Value) { } } -func GetSerializedMizuConfig() (string, error) { - mizuConfig, err := getMizuConfig() +func GetSerializedMizuAgentConfig(targetNamespaces []string, mizuApiFilteringOptions *api.TrafficFilteringOptions) (string, error) { + mizuConfig, err := getMizuAgentConfig(targetNamespaces, mizuApiFilteringOptions) if err != nil { return "", err } @@ -379,14 +379,24 @@ func GetSerializedMizuConfig() (string, error) { return string(serializedConfig), nil } -func getMizuConfig() (*shared.MizuAgentConfig, error) { +func getMizuAgentConfig(targetNamespaces []string, mizuApiFilteringOptions *api.TrafficFilteringOptions) (*shared.MizuAgentConfig, error) { serializableRegex, err := api.CompileRegexToSerializableRegexp(Config.Tap.PodRegexStr) if err != nil { return nil, err } config := shared.MizuAgentConfig{ - TapTargetRegex: *serializableRegex, - MaxDBSizeBytes: Config.Tap.MaxEntriesDBSizeBytes(), + TapTargetRegex: *serializableRegex, + MaxDBSizeBytes: Config.Tap.MaxEntriesDBSizeBytes(), + DaemonMode: Config.Tap.DaemonMode, + TargetNamespaces: targetNamespaces, + AgentImage: Config.AgentImage, + PullPolicy: Config.ImagePullPolicyStr, + DumpLogs: Config.DumpLogs, + IgnoredUserAgents: Config.Tap.IgnoredUserAgents, + TapperResources: Config.Tap.TapperResources, + MizuResourcesNamespace: Config.MizuResourcesNamespace, + MizuApiFilteringOptions: *mizuApiFilteringOptions, + AgentDatabasePath: fmt.Sprintf("%s%s", shared.DataDirPath, "entries.db"), } return &config, nil } diff --git a/cli/config/configStructs/tapConfig.go b/cli/config/configStructs/tapConfig.go index 0d26ec893..38fb187db 100644 --- a/cli/config/configStructs/tapConfig.go +++ b/cli/config/configStructs/tapConfig.go @@ -21,6 +21,7 @@ const ( WorkspaceTapName = "workspace" EnforcePolicyFile = "traffic-validation-file" ContractFile = "contract" + DaemonModeTapName = "daemon" ) type TapConfig struct { @@ -42,6 +43,7 @@ type TapConfig struct { AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"` ApiServerResources shared.Resources `yaml:"api-server-resources"` TapperResources shared.Resources `yaml:"tapper-resources"` + DaemonMode bool `yaml:"daemon" default:"false"` } func (config *TapConfig) PodRegex() *regexp.Regexp { diff --git a/cli/mizu/consts.go b/cli/mizu/consts.go index ab9717b19..7c5ce60fa 100644 --- a/cli/mizu/consts.go +++ b/cli/mizu/consts.go @@ -6,11 +6,12 @@ import ( ) var ( - SemVer = "0.0.1" - Branch = "develop" - GitCommitHash = "" // this var is overridden using ldflags in makefile when building - BuildTimestamp = "" // this var is overridden using ldflags in makefile when building - RBACVersion = "v1" + SemVer = "0.0.1" + Branch = "develop" + GitCommitHash = "" // this var is overridden using ldflags in makefile when building + BuildTimestamp = "" // this var is overridden using ldflags in makefile when building + RBACVersion = "v1" + DaemonModePersistentVolumeSizeBufferBytes = int64(500 * 1000 * 1000) //500mb ) func GetMizuFolderPath() string { diff --git a/cli/mizu/version/versionCheck.go b/cli/mizu/version/versionCheck.go index 32f5e01f2..ceefe0921 100644 --- a/cli/mizu/version/versionCheck.go +++ b/cli/mizu/version/versionCheck.go @@ -18,8 +18,8 @@ import ( "github.com/up9inc/mizu/shared/semver" ) -func CheckVersionCompatibility() (bool, error) { - apiSemVer, err := apiserver.Provider.GetVersion() +func CheckVersionCompatibility(apiServerProvider *apiserver.Provider) (bool, error) { + apiSemVer, err := apiServerProvider.GetVersion() if err != nil { return false, err } diff --git a/cli/telemetry/telemetry.go b/cli/telemetry/telemetry.go index cc1f9bdbd..1bd10acfd 100644 --- a/cli/telemetry/telemetry.go +++ b/cli/telemetry/telemetry.go @@ -35,13 +35,13 @@ func ReportRun(cmd string, args interface{}) { logger.Log.Debugf("successfully reported telemetry for cmd %v", cmd) } -func ReportAPICalls() { +func ReportAPICalls(apiProvider *apiserver.Provider) { if !shouldRunTelemetry() { logger.Log.Debugf("not reporting telemetry") return } - generalStats, err := apiserver.Provider.GetGeneralStats() + generalStats, err := apiProvider.GetGeneralStats() if err != nil { logger.Log.Debugf("[ERROR] failed get general stats from api server %v", err) return diff --git a/examples/roles/permissions-all-namespaces-daemon.yaml b/examples/roles/permissions-all-namespaces-daemon.yaml new file mode 100644 index 000000000..5a32eaaf5 --- /dev/null +++ b/examples/roles/permissions-all-namespaces-daemon.yaml @@ -0,0 +1,64 @@ +# This example shows the roles required for a user to be able to use Mizu in all namespaces. +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: mizu-runner-clusterrole +rules: + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch", "delete"] + - apiGroups: [ "" ] + resources: [ "deployments" ] + verbs: [ "create", "delete" ] + - apiGroups: [""] + resources: ["services"] + verbs: ["get", "list", "watch", "create", "delete"] + - apiGroups: ["apps"] + resources: ["daemonsets"] + verbs: ["create", "patch", "delete"] + - apiGroups: [""] + resources: ["namespaces"] + verbs: ["get", "list", "watch", "create", "delete"] + - apiGroups: [""] + resources: ["services/proxy"] + verbs: ["get"] + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create", "delete"] + - apiGroups: [""] + resources: ["serviceaccounts"] + verbs: ["get", "create", "delete"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["clusterroles"] + verbs: ["get", "create", "delete"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["clusterrolebindings"] + verbs: ["get", "create", "delete"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles"] + verbs: ["get", "create", "delete"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["rolebindings"] + verbs: ["get", "create", "delete"] + - apiGroups: ["apps", "extensions"] + resources: ["pods"] + verbs: ["get", "list", "watch"] + - apiGroups: ["apps", "extensions"] + resources: ["services"] + verbs: ["get", "list", "watch"] + - apiGroups: ["", "apps", "extensions"] + resources: ["endpoints"] + verbs: ["get", "list", "watch"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: mizu-runner-clusterrolebindings +subjects: + - kind: User + name: user1 + apiGroup: rbac.authorization.k8s.io +roleRef: + kind: ClusterRole + name: mizu-runner-clusterrole + apiGroup: rbac.authorization.k8s.io diff --git a/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml b/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml index c4e809ac3..bced278e9 100644 --- a/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml +++ b/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml @@ -20,6 +20,9 @@ rules: - apiGroups: [""] resources: ["services/proxy"] verbs: ["get"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create", "delete"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-all-namespaces.yaml b/examples/roles/permissions-all-namespaces.yaml index ff1060df0..097d276e0 100644 --- a/examples/roles/permissions-all-namespaces.yaml +++ b/examples/roles/permissions-all-namespaces.yaml @@ -19,6 +19,9 @@ rules: - apiGroups: [""] resources: ["services/proxy"] verbs: ["get"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create", "delete"] - apiGroups: [""] resources: ["serviceaccounts"] verbs: ["get", "create", "delete"] diff --git a/examples/roles/permissions-ns-daemon.yaml b/examples/roles/permissions-ns-daemon.yaml new file mode 100644 index 000000000..c73513e8c --- /dev/null +++ b/examples/roles/permissions-ns-daemon.yaml @@ -0,0 +1,57 @@ +# This example shows the roles required for a user to be able to use Mizu in a single namespace. +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: mizu-runner-role + namespace: user1 +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch", "delete"] +- apiGroups: [ "" ] + resources: [ "deployments" ] + verbs: [ "get", "create", "delete" ] +- apiGroups: [""] + resources: ["services"] + verbs: ["get", "list", "watch", "create", "delete"] +- apiGroups: ["apps"] + resources: ["daemonsets"] + verbs: ["get", "create", "patch", "delete"] +- apiGroups: [""] + resources: ["services/proxy"] + verbs: ["get"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create", "delete"] +- apiGroups: [""] + resources: ["serviceaccounts"] + verbs: ["get", "create", "delete"] +- apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles"] + verbs: ["get", "create", "delete"] +- apiGroups: ["rbac.authorization.k8s.io"] + resources: ["rolebindings"] + verbs: ["get", "create", "delete"] +- apiGroups: ["apps", "extensions"] + resources: ["pods"] + verbs: ["get", "list", "watch"] +- apiGroups: ["apps", "extensions"] + resources: ["services"] + verbs: ["get", "list", "watch"] +- apiGroups: ["", "apps", "extensions"] + resources: ["endpoints"] + verbs: ["get", "list", "watch"] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: mizu-runner-rolebindings + namespace: user1 +subjects: +- kind: User + name: user1 + apiGroup: rbac.authorization.k8s.io +roleRef: + kind: Role + name: mizu-runner-role + apiGroup: rbac.authorization.k8s.io diff --git a/examples/roles/permissions-ns-without-ip-resolution.yaml b/examples/roles/permissions-ns-without-ip-resolution.yaml index 4293f8979..ef14933a1 100644 --- a/examples/roles/permissions-ns-without-ip-resolution.yaml +++ b/examples/roles/permissions-ns-without-ip-resolution.yaml @@ -17,6 +17,9 @@ rules: - apiGroups: [""] resources: ["services/proxy"] verbs: ["get"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create", "delete"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-ns.yaml b/examples/roles/permissions-ns.yaml index da60a02e1..3af89afa0 100644 --- a/examples/roles/permissions-ns.yaml +++ b/examples/roles/permissions-ns.yaml @@ -17,6 +17,9 @@ rules: - apiGroups: [""] resources: ["services/proxy"] verbs: ["get"] +- apiGroups: [ "" ] + resources: [ "configmaps" ] + verbs: [ "get", "create", "delete" ] - apiGroups: [""] resources: ["serviceaccounts"] verbs: ["get", "create", "delete"] diff --git a/shared/consts.go b/shared/consts.go index 84efe1219..79e4b84e5 100644 --- a/shared/consts.go +++ b/shared/consts.go @@ -7,6 +7,7 @@ const ( NodeNameEnvVar = "NODE_NAME" TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" ConfigDirPath = "/app/config/" + DataDirPath = "/app/data/" ValidationRulesFileName = "validation-rules.yaml" ContractFileName = "contract-oas.yaml" ConfigFileName = "mizu-config.json" diff --git a/shared/kubernetes/consts.go b/shared/kubernetes/consts.go index d95ec3b0f..68acdd1ec 100644 --- a/shared/kubernetes/consts.go +++ b/shared/kubernetes/consts.go @@ -4,7 +4,9 @@ const ( MizuResourcesPrefix = "mizu-" ApiServerPodName = MizuResourcesPrefix + "api-server" ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding" + DaemonRoleBindingName = MizuResourcesPrefix + "cluster-role-binding-daemon" ClusterRoleName = MizuResourcesPrefix + "cluster-role" + DaemonRoleName = MizuResourcesPrefix + "cluster-role-daemon" K8sAllNamespaces = "" RoleBindingName = MizuResourcesPrefix + "role-binding" RoleName = MizuResourcesPrefix + "role" @@ -12,5 +14,6 @@ const ( TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set" TapperPodName = MizuResourcesPrefix + "tapper" ConfigMapName = MizuResourcesPrefix + "config" + PersistentVolumeClaimName = MizuResourcesPrefix + "volume-claim" MinKubernetesServerVersion = "1.16.0" ) diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index 0a9a057bf..1adda9858 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -11,6 +11,7 @@ import ( "github.com/up9inc/mizu/shared/semver" "github.com/up9inc/mizu/tap/api" "io" + v1 "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,6 +26,7 @@ import ( applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -83,9 +85,29 @@ func NewProvider(kubeConfigPath string) (*Provider, error) { }, nil } -func (provider *Provider) CurrentNamespace() string { - ns, _, _ := provider.kubernetesConfig.Namespace() - return ns +func NewProviderInCluster() (*Provider, error) { + restClientConfig, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + clientSet, err := getClientSet(restClientConfig) + if err != nil { + return nil, err + } + + return &Provider{ + clientSet: clientSet, + kubernetesConfig: nil, // not relevant in cluster + clientConfig: *restClientConfig, + }, nil +} + +func (provider *Provider) CurrentNamespace() (string, error) { + if provider.kubernetesConfig == nil { + return "", errors.New("kubernetesConfig is nil, mizu cli will not work with in-cluster kubernetes config, use a kubeconfig file when initializing the Provider") + } + ns, _, err := provider.kubernetesConfig.Namespace() + return ns, err } func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name string) error { @@ -158,7 +180,7 @@ type ApiServerOptions struct { DumpLogs bool } -func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiServerOptions) (*core.Pod, error) { +func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, mountVolumeClaim bool, volumeClaimName string) (*core.Pod, error) { var marshaledSyncEntriesConfig []byte if opts.SyncEntriesConfig != nil { var err error @@ -192,6 +214,36 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS command = append(command, "--namespace", opts.Namespace) } + volumeMounts := []core.VolumeMount{ + { + Name: ConfigMapName, + MountPath: shared.ConfigDirPath, + }, + } + volumes := []core.Volume{ + { + Name: ConfigMapName, + VolumeSource: core.VolumeSource{ + ConfigMap: configMapVolume, + }, + }, + } + + if mountVolumeClaim { + volumes = append(volumes, core.Volume{ + Name: volumeClaimName, + VolumeSource: core.VolumeSource{ + PersistentVolumeClaim: &core.PersistentVolumeClaimVolumeSource{ + ClaimName: volumeClaimName, + }, + }, + }) + volumeMounts = append(volumeMounts, core.VolumeMount{ + Name: volumeClaimName, + MountPath: shared.DataDirPath, + }) + } + port := intstr.FromInt(shared.DefaultApiServerPort) debugMode := "" @@ -202,7 +254,6 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS pod := &core.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: opts.PodName, - Namespace: opts.Namespace, Labels: map[string]string{"app": opts.PodName}, }, Spec: core.PodSpec{ @@ -211,12 +262,7 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS Name: opts.PodName, Image: opts.PodImage, ImagePullPolicy: opts.ImagePullPolicy, - VolumeMounts: []core.VolumeMount{ - { - Name: ConfigMapName, - MountPath: shared.ConfigDirPath, - }, - }, + VolumeMounts: volumeMounts, Command: command, Env: []core.EnvVar{ { @@ -259,30 +305,51 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS }, }, }, - Volumes: []core.Volume{ - { - Name: ConfigMapName, - VolumeSource: core.VolumeSource{ - ConfigMap: configMapVolume, - }, - }, - }, + Volumes: volumes, DNSPolicy: core.DNSClusterFirstWithHostNet, TerminationGracePeriodSeconds: new(int64), }, } + //define the service account only when it exists to prevent pod crash if opts.ServiceAccountName != "" { pod.Spec.ServiceAccountName = opts.ServiceAccountName } - return provider.clientSet.CoreV1().Pods(opts.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + return pod, nil +} + + +func (provider *Provider) CreatePod(ctx context.Context, namespace string, podSpec *core.Pod) (*core.Pod, error) { + return provider.clientSet.CoreV1().Pods(namespace).Create(ctx, podSpec, metav1.CreateOptions{}) +} + +func (provider *Provider) CreateDeployment(ctx context.Context, namespace string, deploymentName string, podSpec *core.Pod) (*v1.Deployment, error) { + if _, keyExists := podSpec.ObjectMeta.Labels["app"]; keyExists == false { + return nil, errors.New("pod spec must contain 'app' label") + } + podTemplate := &core.PodTemplateSpec{ + ObjectMeta: podSpec.ObjectMeta, + Spec: podSpec.Spec, + } + deployment := &v1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentName, + }, + Spec: v1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": podSpec.ObjectMeta.Labels["app"]}, + }, + Template: *podTemplate, + Strategy: v1.DeploymentStrategy{}, + }, + } + return provider.clientSet.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) } func (provider *Provider) CreateService(ctx context.Context, namespace string, serviceName string, appLabelValue string) (*core.Service, error) { service := core.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, - Namespace: namespace, }, Spec: core.ServiceSpec{ Ports: []core.ServicePort{{TargetPort: intstr.FromInt(shared.DefaultApiServerPort), Port: 80}}, @@ -315,7 +382,6 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, serviceAccount := &core.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, - Namespace: namespace, Labels: map[string]string{"mizu-cli-version": version}, }, } @@ -369,7 +435,6 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context, serviceAccount := &core.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, - Namespace: namespace, Labels: map[string]string{"mizu-cli-version": version}, }, } @@ -419,6 +484,49 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context, return nil } +func (provider *Provider) CreateDaemonsetRBAC(ctx context.Context, namespace string, serviceAccountName string, roleName string, roleBindingName string, version string) error { + role := &rbac.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: roleName, + Labels: map[string]string{"mizu-cli-version": version}, + }, + Rules: []rbac.PolicyRule{ + { + APIGroups: []string{"apps"}, + Resources: []string{"daemonsets"}, + Verbs: []string{"patch", "get", "list", "create", "delete"}, + }, + }, + } + roleBinding := &rbac.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: roleBindingName, + Labels: map[string]string{"mizu-cli-version": version}, + }, + RoleRef: rbac.RoleRef{ + Name: roleName, + Kind: "Role", + APIGroup: "rbac.authorization.k8s.io", + }, + Subjects: []rbac.Subject{ + { + Kind: "ServiceAccount", + Name: serviceAccountName, + Namespace: namespace, + }, + }, + } + _, err := provider.clientSet.RbacV1().Roles(namespace).Create(ctx, role, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return err + } + _, err = provider.clientSet.RbacV1().RoleBindings(namespace).Create(ctx, roleBinding, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return err + } + return nil +} + func (provider *Provider) RemoveNamespace(ctx context.Context, name string) error { err := provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{}) return provider.handleRemovalError(err) @@ -454,6 +562,11 @@ func (provider *Provider) RemovePod(ctx context.Context, namespace string, podNa return provider.handleRemovalError(err) } +func (provider *Provider) RemoveDeployment(ctx context.Context, namespace string, deploymentName string) error { + err := provider.clientSet.AppsV1().Deployments(namespace).Delete(ctx, deploymentName, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) +} + func (provider *Provider) RemoveConfigMap(ctx context.Context, namespace string, configMapName string) error { err := provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{}) return provider.handleRemovalError(err) @@ -496,7 +609,6 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, }, ObjectMeta: metav1.ObjectMeta{ Name: configMapName, - Namespace: namespace, }, Data: configMapData, } @@ -703,8 +815,7 @@ func (provider *Provider) GetPodLogs(ctx context.Context, namespace string, podN } func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace string) (string, error) { - eventsOpts := metav1.ListOptions{} - eventList, err := provider.clientSet.CoreV1().Events(namespace).List(ctx, eventsOpts) + eventList, err := provider.clientSet.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return "", fmt.Errorf("error getting events on ns: %s, %w", namespace, err) } @@ -712,6 +823,45 @@ func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace stri return eventList.String(), nil } +func (provider *Provider) IsDefaultStorageProviderAvailable(ctx context.Context) (bool, error) { + storageClassList, err := provider.clientSet.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, storageClass := range storageClassList.Items { + if storageClass.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" { + return true, nil + } + } + return false, nil +} + +func (provider *Provider) CreatePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string, sizeLimitBytes int64) (*core.PersistentVolumeClaim, error) { + sizeLimitQuantity := resource.NewQuantity(sizeLimitBytes, resource.DecimalSI) + volumeClaim := &core.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumeClaimName, + }, + Spec: core.PersistentVolumeClaimSpec{ + AccessModes: []core.PersistentVolumeAccessMode{core.ReadWriteOnce}, + Resources: core.ResourceRequirements{ + Limits: core.ResourceList{ + core.ResourceStorage: *sizeLimitQuantity, + }, + Requests: core.ResourceList{ + core.ResourceStorage: *sizeLimitQuantity, + }, + }, + }, + } + + return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, volumeClaim, metav1.CreateOptions{}) +} + +func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error { + return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{}) +} + func getClientSet(config *restclient.Config) (*kubernetes.Clientset, error) { clientSet, err := kubernetes.NewForConfig(config) if err != nil { diff --git a/shared/kubernetes/utils.go b/shared/kubernetes/utils.go index f77f27f27..6a020e259 100644 --- a/shared/kubernetes/utils.go +++ b/shared/kubernetes/utils.go @@ -1,6 +1,7 @@ package kubernetes import ( + "github.com/up9inc/mizu/shared" core "k8s.io/api/core/v1" "regexp" ) @@ -55,3 +56,12 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { } return missingPods } + + +func GetPodInfosForPods(pods []core.Pod) []shared.PodInfo { + podInfos := make([]shared.PodInfo, 0) + for _, pod := range pods { + podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace}) + } + return podInfos +} diff --git a/shared/models.go b/shared/models.go index e857d4cb2..10b47883b 100644 --- a/shared/models.go +++ b/shared/models.go @@ -27,8 +27,18 @@ type Resources struct { } type MizuAgentConfig struct { - TapTargetRegex api.SerializableRegexp `yaml:"tapTargetRegex"` - MaxDBSizeBytes int64 `yaml:"maxDBSizeBytes"` + TapTargetRegex api.SerializableRegexp `json:"tapTargetRegex"` + MaxDBSizeBytes int64 `json:"maxDBSizeBytes"` + DaemonMode bool `json:"daemonMode"` + TargetNamespaces []string `json:"targetNamespaces"` + AgentImage string `json:"agentImage"` + PullPolicy string `json:"pullPolicy"` + DumpLogs bool `json:"dumpLogs"` + IgnoredUserAgents []string `json:"ignoredUserAgents"` + TapperResources Resources `json:"tapperResources"` + MizuResourcesNamespace string `json:"mizuResourceNamespace"` + MizuApiFilteringOptions api.TrafficFilteringOptions `json:"mizuApiFilteringOptions"` + AgentDatabasePath string `json:"agentDatabasePath"` } type WebSocketMessageMetadata struct { @@ -94,6 +104,11 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc } } +type HealthResponse struct { + TapStatus TapStatus `json:"tapStatus"` + TappersCount int `json:"tappersCount"` +} + type VersionResponse struct { SemVer string `json:"semver"` } diff --git a/shared/sliceUtils.go b/shared/sliceUtils.go index 6d08c70b5..4feaa5bdf 100644 --- a/shared/sliceUtils.go +++ b/shared/sliceUtils.go @@ -19,7 +19,6 @@ func ContainsInt(slice []int, containsValue int) bool { return false } - func Unique(slice []string) []string { keys := make(map[string]bool) var list []string diff --git a/shared/sliceUtils_test.go b/shared/sliceUtils_test.go index 97d79b0bb..e5b8bf43a 100644 --- a/shared/sliceUtils_test.go +++ b/shared/sliceUtils_test.go @@ -93,8 +93,8 @@ func TestContainsNilSlice(t *testing.T) { func TestUniqueNoDuplicateValues(t *testing.T) { tests := []struct { - Slice []string - Expected []string + Slice []string + Expected []string }{ {Slice: []string{"apple", "orange", "banana", "grapes"}, Expected: []string{"apple", "orange", "banana", "grapes"}}, {Slice: []string{"dog", "cat", "mouse"}, Expected: []string{"dog", "cat", "mouse"}}, @@ -112,8 +112,8 @@ func TestUniqueNoDuplicateValues(t *testing.T) { func TestUniqueDuplicateValues(t *testing.T) { tests := []struct { - Slice []string - Expected []string + Slice []string + Expected []string }{ {Slice: []string{"apple", "apple", "orange", "orange", "banana", "banana", "grapes", "grapes"}, Expected: []string{"apple", "orange", "banana", "grapes"}}, {Slice: []string{"dog", "cat", "cat", "mouse"}, Expected: []string{"dog", "cat", "mouse"}},