mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-02 21:21:02 +00:00
730 lines
29 KiB
Go
730 lines
29 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/getkin/kin-openapi/openapi3"
|
|
"gopkg.in/yaml.v3"
|
|
core "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
"github.com/up9inc/mizu/cli/apiserver"
|
|
"github.com/up9inc/mizu/cli/cmd/goUtils"
|
|
"github.com/up9inc/mizu/cli/config"
|
|
"github.com/up9inc/mizu/cli/config/configStructs"
|
|
"github.com/up9inc/mizu/cli/errormessage"
|
|
"github.com/up9inc/mizu/cli/mizu"
|
|
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
|
"github.com/up9inc/mizu/cli/uiUtils"
|
|
"github.com/up9inc/mizu/shared"
|
|
"github.com/up9inc/mizu/shared/kubernetes"
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
const cleanupTimeout = time.Minute
|
|
|
|
type tapState struct {
|
|
startTime time.Time
|
|
targetNamespaces []string
|
|
|
|
apiServerService *core.Service
|
|
tapperSyncer *kubernetes.MizuTapperSyncer
|
|
mizuServiceAccountExists bool
|
|
}
|
|
|
|
var state tapState
|
|
var apiProvider *apiserver.Provider
|
|
|
|
func RunMizuTap() {
|
|
state.startTime = time.Now()
|
|
|
|
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
|
|
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
var serializedValidationRules string
|
|
if config.Config.Tap.EnforcePolicyFile != "" {
|
|
serializedValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
}
|
|
|
|
// Read and validate the OAS file
|
|
var serializedContract string
|
|
if config.Config.Tap.ContractFile != "" {
|
|
bytes, err := ioutil.ReadFile(config.Config.Tap.ContractFile)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading contract file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
serializedContract = string(bytes)
|
|
|
|
ctx := context.Background()
|
|
loader := &openapi3.Loader{Context: ctx}
|
|
doc, err := loader.LoadFromData(bytes)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error loading contract file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
err = doc.Validate(ctx)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error validating contract file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
}
|
|
|
|
kubernetesProvider, err := getKubernetesProviderForCli()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel() // cancel will be called when this function exits
|
|
|
|
state.targetNamespaces = getNamespaces(kubernetesProvider)
|
|
|
|
serializedMizuConfig, err := config.GetSerializedMizuAgentConfig(state.targetNamespaces, mizuApiFilteringOptions)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
if config.Config.IsNsRestrictedMode() {
|
|
if len(state.targetNamespaces) != 1 || !shared.Contains(state.targetNamespaces, config.Config.MizuResourcesNamespace) {
|
|
logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
|
|
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName)
|
|
return
|
|
}
|
|
}
|
|
|
|
var namespacesStr string
|
|
if !shared.Contains(state.targetNamespaces, kubernetes.K8sAllNamespaces) {
|
|
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(state.targetNamespaces, "\", \""))
|
|
} else {
|
|
namespacesStr = "all namespaces"
|
|
}
|
|
|
|
logger.Log.Infof("Tapping pods in %s", namespacesStr)
|
|
|
|
if err := printTappedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", errormessage.FormatError(err)))
|
|
}
|
|
|
|
if config.Config.Tap.DryRun {
|
|
return
|
|
}
|
|
|
|
logger.Log.Infof("Waiting for Mizu Agent to start...")
|
|
if err := createMizuResources(ctx, cancel, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
|
|
|
var statusError *k8serrors.StatusError
|
|
if errors.As(err, &statusError) {
|
|
if statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists {
|
|
logger.Log.Info("Mizu is already running in this namespace, change the `mizu-resources-namespace` configuration or run `mizu clean` to remove the currently running Mizu instance")
|
|
}
|
|
}
|
|
return
|
|
}
|
|
if config.Config.Tap.DaemonMode {
|
|
if err := handleDaemonModePostCreation(ctx, cancel, kubernetesProvider, state.targetNamespaces); err != nil {
|
|
defer finishMizuExecution(kubernetesProvider, apiProvider)
|
|
cancel()
|
|
} else {
|
|
logger.Log.Infof(uiUtils.Magenta, "Mizu is now running in daemon mode, run `mizu view` to connect to the mizu daemon instance")
|
|
}
|
|
} else {
|
|
defer finishMizuExecution(kubernetesProvider, apiProvider)
|
|
|
|
go goUtils.HandleExcWrapper(watchApiServerEvents, ctx, kubernetesProvider, cancel)
|
|
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
|
|
|
|
// block until exit signal or error
|
|
waitForFinish(ctx, cancel)
|
|
}
|
|
}
|
|
|
|
func handleDaemonModePostCreation(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
|
if err := printTappedPodsPreview(ctx, kubernetesProvider, namespaces); err != nil {
|
|
return err
|
|
}
|
|
|
|
apiProvider := apiserver.NewProvider(GetApiServerUrl(), 90, 1*time.Second)
|
|
|
|
if err := waitForDaemonModeToBeReady(cancel, kubernetesProvider, apiProvider); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
this function is a bit problematic as it might be detached from the actual pods the mizu api server will tap.
|
|
The alternative would be to wait for api server to be ready and then query it for the pods it listens to, this has
|
|
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
|
|
*/
|
|
func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
|
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
|
|
return err
|
|
} else {
|
|
if len(matchingPods) == 0 {
|
|
printNoPodsFoundSuggestion(namespaces)
|
|
}
|
|
for _, tappedPod := range matchingPods {
|
|
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name))
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func waitForDaemonModeToBeReady(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) error {
|
|
logger.Log.Info("Waiting for mizu to be ready... (may take a few minutes)")
|
|
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
|
|
|
// TODO: TRA-3903 add a smarter test to see that tapping/pod watching is functioning properly
|
|
if err := apiProvider.TestConnection(); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu was not ready in time, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, mizuApiFilteringOptions api.TrafficFilteringOptions, startTime time.Time) error {
|
|
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
|
|
TargetNamespaces: targetNamespaces,
|
|
PodFilterRegex: *config.Config.Tap.PodRegex(),
|
|
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
|
AgentImage: config.Config.AgentImage,
|
|
TapperResources: config.Config.Tap.TapperResources,
|
|
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
|
LogLevel: config.Config.LogLevel(),
|
|
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
|
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
|
MizuServiceAccountExists: state.mizuServiceAccountExists,
|
|
Istio: config.Config.Tap.Istio,
|
|
}, startTime)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case syncerErr, ok := <-tapperSyncer.ErrorOut:
|
|
if !ok {
|
|
logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop")
|
|
return
|
|
}
|
|
logger.Log.Errorf(uiUtils.Error, getErrorDisplayTextForK8sTapManagerError(syncerErr))
|
|
cancel()
|
|
case _, ok := <-tapperSyncer.TapPodChangesOut:
|
|
if !ok {
|
|
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
|
|
return
|
|
}
|
|
if err := apiProvider.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil {
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
}
|
|
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
|
if !ok {
|
|
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
|
return
|
|
}
|
|
if err := apiProvider.ReportTapperStatus(tapperStatus); err != nil {
|
|
logger.Log.Debugf("[Error] failed update tapper status %v", err)
|
|
}
|
|
case <-ctx.Done():
|
|
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
state.tapperSyncer = tapperSyncer
|
|
|
|
return nil
|
|
}
|
|
|
|
func printNoPodsFoundSuggestion(targetNamespaces []string) {
|
|
var suggestionStr string
|
|
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
|
|
suggestionStr = ". You can also try selecting a different namespace with -n or tap all namespaces with -A"
|
|
}
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any currently running pods that match the regex argument, mizu will automatically tap matching pods if any are created later%s", suggestionStr))
|
|
}
|
|
|
|
func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) string {
|
|
switch err.TapManagerReason {
|
|
case kubernetes.TapManagerPodListError:
|
|
return fmt.Sprintf("Failed to update currently tapped pods: %v", err.OriginalError)
|
|
case kubernetes.TapManagerPodWatchError:
|
|
return fmt.Sprintf("Error occured in k8s pod watch: %v", err.OriginalError)
|
|
case kubernetes.TapManagerTapperUpdateError:
|
|
return fmt.Sprintf("Error updating tappers: %v", err.OriginalError)
|
|
default:
|
|
return fmt.Sprintf("Unknown error occured in k8s tap manager: %v", err.OriginalError)
|
|
}
|
|
}
|
|
|
|
func readValidationRules(file string) (string, error) {
|
|
rules, err := shared.DecodeEnforcePolicy(file)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
newContent, _ := yaml.Marshal(&rules)
|
|
return string(newContent), nil
|
|
}
|
|
|
|
func createMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
|
|
var err error
|
|
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
|
|
if err != nil {
|
|
if !config.Config.Tap.DaemonMode {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to ensure the resources required for IP resolving. Mizu will not resolve target IPs to names. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
|
|
var serviceAccountName string
|
|
if state.mizuServiceAccountExists {
|
|
serviceAccountName = kubernetes.ServiceAccountName
|
|
} else {
|
|
serviceAccountName = ""
|
|
}
|
|
|
|
opts := &kubernetes.ApiServerOptions{
|
|
Namespace: config.Config.MizuResourcesNamespace,
|
|
PodName: kubernetes.ApiServerPodName,
|
|
PodImage: config.Config.AgentImage,
|
|
ServiceAccountName: serviceAccountName,
|
|
IsNamespaceRestricted: config.Config.IsNsRestrictedMode(),
|
|
SyncEntriesConfig: getSyncEntriesConfig(),
|
|
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
|
|
Resources: config.Config.Tap.ApiServerResources,
|
|
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
|
LogLevel: config.Config.LogLevel(),
|
|
}
|
|
|
|
if config.Config.Tap.DaemonMode {
|
|
if !state.mizuServiceAccountExists {
|
|
defer cleanUpMizuResources(ctx, cancel, kubernetesProvider)
|
|
logger.Log.Fatalf(uiUtils.Red, fmt.Sprintf("Failed to ensure the resources required for mizu to run in daemon mode. cannot proceed. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
if err := createMizuApiServerDeployment(ctx, kubernetesProvider, opts); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := createMizuApiServerPod(ctx, kubernetesProvider, opts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
state.apiServerService, err = kubernetesProvider.CreateService(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, kubernetes.ApiServerPodName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created service: %s", kubernetes.ApiServerPodName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
|
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName, serializedValidationRules, serializedContract, serializedMizuConfig)
|
|
return err
|
|
}
|
|
|
|
func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
|
|
_, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace)
|
|
return err
|
|
}
|
|
|
|
func createMizuApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
|
|
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, false, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err = kubernetesProvider.CreatePod(ctx, config.Config.MizuResourcesNamespace, pod); err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created API server pod: %s", kubernetes.ApiServerPodName)
|
|
return nil
|
|
}
|
|
|
|
func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
|
|
volumeClaimCreated := false
|
|
if !config.Config.Tap.NoPersistentVolumeClaim {
|
|
volumeClaimCreated = TryToCreatePersistentVolumeClaim(ctx, kubernetesProvider)
|
|
}
|
|
|
|
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, volumeClaimCreated, kubernetes.PersistentVolumeClaimName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pod.Spec.Containers[0].LivenessProbe = &core.Probe{
|
|
Handler: core.Handler{
|
|
HTTPGet: &core.HTTPGetAction{
|
|
Path: "/echo",
|
|
Port: intstr.FromInt(shared.DefaultApiServerPort),
|
|
},
|
|
},
|
|
InitialDelaySeconds: 1,
|
|
PeriodSeconds: 10,
|
|
}
|
|
if _, err = kubernetesProvider.CreateDeployment(ctx, config.Config.MizuResourcesNamespace, opts.PodName, pod); err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created API server deployment: %s", kubernetes.ApiServerPodName)
|
|
return nil
|
|
}
|
|
|
|
func TryToCreatePersistentVolumeClaim(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
|
|
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
|
|
if err != nil {
|
|
logger.Log.Warningf(uiUtils.Yellow, "An error occured when checking if a default storage provider exists in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
|
|
logger.Log.Debugf("error checking if default storage class exists: %v", err)
|
|
return false
|
|
} else if !isDefaultStorageClassAvailable {
|
|
logger.Log.Warningf(uiUtils.Yellow, "Could not find default storage provider in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
|
|
return false
|
|
}
|
|
|
|
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
|
|
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this means mizu data will be lost on mizu-api-server pod restart")
|
|
logger.Log.Debugf("error creating persistent volume claim: %v", err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
|
|
var compiledRegexSlice []*api.SerializableRegexp
|
|
|
|
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
|
|
compiledRegexSlice = make([]*api.SerializableRegexp, 0)
|
|
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
|
|
compiledRegex, err := api.CompileRegexToSerializableRegexp(regexStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
|
|
}
|
|
}
|
|
|
|
return &api.TrafficFilteringOptions{
|
|
PlainTextMaskingRegexes: compiledRegexSlice,
|
|
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
|
DisableRedaction: config.Config.Tap.DisableRedaction,
|
|
}, nil
|
|
}
|
|
|
|
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
|
if !config.Config.Tap.Analysis && config.Config.Tap.Workspace == "" {
|
|
return nil
|
|
}
|
|
|
|
return &shared.SyncEntriesConfig{
|
|
Token: config.Config.Auth.Token,
|
|
Env: config.Config.Auth.EnvName,
|
|
Workspace: config.Config.Tap.Workspace,
|
|
UploadIntervalSec: config.Config.Tap.UploadIntervalSec,
|
|
}
|
|
}
|
|
|
|
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
|
|
leftoverResources := make([]string, 0)
|
|
|
|
if err := kubernetesProvider.RemoveService(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Service %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, kubernetes.TapperDaemonSetName); err != nil {
|
|
resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", kubernetes.TapperDaemonSetName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveConfigMap(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName); err != nil {
|
|
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", kubernetes.ConfigMapName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveServicAccount(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Service Account %s in namespace %s", kubernetes.ServiceAccountName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.RoleName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Pod %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
//daemon mode resources
|
|
if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName); err != nil {
|
|
resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.RoleBindingName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveDeployment(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Deployment %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemovePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName); err != nil {
|
|
resourceDesc := fmt.Sprintf("PersistentVolumeClaim %s in namespace %s", kubernetes.PersistentVolumeClaimName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, kubernetes.DaemonRoleName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.DaemonRoleName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, kubernetes.DaemonRoleBindingName); err != nil {
|
|
resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.DaemonRoleBindingName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
return leftoverResources
|
|
}
|
|
|
|
func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) []string {
|
|
leftoverResources := make([]string, 0)
|
|
|
|
if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
|
resourceDesc := fmt.Sprintf("Namespace %s", config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
} else {
|
|
defer waitUntilNamespaceDeleted(ctx, cancel, kubernetesProvider)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveClusterRole(ctx, kubernetes.ClusterRoleName); err != nil {
|
|
resourceDesc := fmt.Sprintf("ClusterRole %s", kubernetes.ClusterRoleName)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, kubernetes.ClusterRoleBindingName); err != nil {
|
|
resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", kubernetes.ClusterRoleBindingName)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
return leftoverResources
|
|
}
|
|
|
|
func handleDeletionError(err error, resourceDesc string, leftoverResources *[]string) {
|
|
logger.Log.Debugf("Error removing %s: %v", resourceDesc, errormessage.FormatError(err))
|
|
*leftoverResources = append(*leftoverResources, resourceDesc)
|
|
}
|
|
|
|
func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
|
|
// Call cancel if a terminating signal was received. Allows user to skip the wait.
|
|
go func() {
|
|
waitForFinish(ctx, cancel)
|
|
}()
|
|
|
|
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
|
switch {
|
|
case ctx.Err() == context.Canceled:
|
|
logger.Log.Debugf("Do nothing. User interrupted the wait")
|
|
case err == wait.ErrWaitTimeout:
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", config.Config.MizuResourcesNamespace))
|
|
default:
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
|
|
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
|
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
|
isPodReady := false
|
|
timeAfter := time.After(25 * time.Second)
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-eventChan:
|
|
if !ok {
|
|
eventChan = nil
|
|
continue
|
|
}
|
|
|
|
switch wEvent.Type {
|
|
case kubernetes.EventAdded:
|
|
logger.Log.Debugf("Watching API Server pod loop, added")
|
|
case kubernetes.EventDeleted:
|
|
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
|
|
cancel()
|
|
return
|
|
case kubernetes.EventModified:
|
|
modifiedPod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, err)
|
|
cancel()
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
|
|
|
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
|
isPodReady = true
|
|
postApiServerStarted(ctx, kubernetesProvider, cancel, err)
|
|
}
|
|
case kubernetes.EventBookmark:
|
|
break
|
|
case kubernetes.EventError:
|
|
break
|
|
}
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Errorf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
|
|
cancel()
|
|
|
|
case <-timeAfter:
|
|
if !isPodReady {
|
|
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
|
|
cancel()
|
|
}
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("Watching API Server pod loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.ApiServerPodName))
|
|
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, podExactRegex, "pod")
|
|
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-eventChan:
|
|
if !ok {
|
|
eventChan = nil
|
|
continue
|
|
}
|
|
|
|
event, err := wEvent.ToEvent()
|
|
if err != nil {
|
|
logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err))
|
|
}
|
|
|
|
if state.startTime.After(event.CreationTimestamp.Time) {
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf(
|
|
fmt.Sprintf("Watching API server events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
|
|
event.Name,
|
|
event.CreationTimestamp.Time,
|
|
event.Regarding.Name,
|
|
event.Regarding.Kind,
|
|
event.Reason,
|
|
event.Note))
|
|
|
|
switch event.Reason {
|
|
case "FailedScheduling", "Failed":
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu API Server status: %s - %s", event.Reason, event.Note))
|
|
cancel()
|
|
break
|
|
}
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Errorf("Watching API server events loop, error: %+v", err)
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("Watching API server events loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, err error) {
|
|
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
|
|
|
url := GetApiServerUrl()
|
|
if err := apiProvider.TestConnection(); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
|
cancel()
|
|
return
|
|
}
|
|
options, _ := getMizuApiFilteringOptions()
|
|
if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err))
|
|
cancel()
|
|
}
|
|
|
|
logger.Log.Infof("Mizu is available at %s", url)
|
|
if !config.Config.HeadlessMode {
|
|
uiUtils.OpenBrowser(url)
|
|
}
|
|
}
|
|
|
|
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
|
|
if config.Config.Tap.AllNamespaces {
|
|
return []string{kubernetes.K8sAllNamespaces}
|
|
} else if len(config.Config.Tap.Namespaces) > 0 {
|
|
return shared.Unique(config.Config.Tap.Namespaces)
|
|
} else {
|
|
currentNamespace, err := kubernetesProvider.CurrentNamespace()
|
|
if err != nil {
|
|
logger.Log.Fatalf(uiUtils.Red, fmt.Sprintf("error getting current namespace: %+v", err))
|
|
}
|
|
return []string{currentNamespace}
|
|
}
|
|
}
|
|
|
|
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := kubernetesProvider.CreateMizuRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.ClusterRoleName, kubernetes.ClusterRoleBindingName, mizu.RBACVersion); err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
if err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.RoleName, kubernetes.RoleBindingName, mizu.RBACVersion); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if config.Config.Tap.DaemonMode {
|
|
if err := kubernetesProvider.CreateDaemonsetRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.DaemonRoleName, kubernetes.DaemonRoleBindingName, mizu.RBACVersion); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|