mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-20 17:48:20 +00:00
* Separate HTTP related code into `extensions/http` as a Go plugin * Move `extensions` folder into `tap` folder * Move HTTP files into `tap/extensions/lib` for now * Replace `orcaman/concurrent-map` with `sync.Map` * Remove `grpc_assembler.go` * Remove `github.com/up9inc/mizu/tap/extensions/http/lib` * Add a build script to automatically build extensions from a known path and load them * Start to define the extension API * Implement the `run()` function for the TCP stream * Add support of defining multiple ports to the extension API * Set the extension name inside the extension * Declare the `Dissect` function in the extension API * Dissect HTTP request from inside the HTTP extension * Make the distinction of outbound and inbound ports * Dissect HTTP response from inside the HTTP extension * Bring back the HTTP request-response pair matcher * Return a `*api.RequestResponsePair` from the dissection * Bring back the gRPC-HTTP/2 parser * Fix the issues in `handleHTTP1ClientStream` and `handleHTTP1ServerStream` * Call a function pointer to emit dissected data back to the `tap` package * roee changes - trying to fix agent to work with the "api" object) - ***still not working*** * small mistake in the conflicts * Fix the issues that are introduced by the merge conflict * Add `Emitter` interface to the API and send `OutputChannelItem`(s) to `OutputChannel` * Fix the `HTTP1` handlers * Set `ConnectionInfo` in HTTP handlers * Fix the `Dockerfile` to build the extensions * remove some unwanted code * no message * Re-enable `getStreamProps` function * Migrate back from `gopacket/tcpassembly` to `gopacket/reassembly` * Introduce `HTTPPayload` struct and `HTTPPayloader` interface to `MarshalJSON()` all the data structures that are returned by the HTTP protocol * Read `socketHarOutChannel` instead of `filteredHarChannel` * Connect `OutputChannelItem` to the last WebSocket means that finally the web UI started to work again * Add `.env.example` to React app * Marshal and unmarshal `*http.Request`, `*http.Response` pairs * Move `loadExtensions` into `main.go` and map extensions into `extensionsMap` * Add `Summarize()` method to the `Dissector` interface * Add `Analyze` method to the `Dissector` interface and `MizuEntry` to the extension API * Add `Protocol` struct and make it effect the UI * Refactor `BaseEntryDetails` struct and display the source and destination ports in the UI * Display the protocol name inside the details layout * Add `Represent` method to the `Dissector` interface and manipulate the UI through this method * Make the protocol color affect the details layout color and write protocol abbreviation vertically * Remove everything HTTP related from the `tap` package and make the extension system fully functional * Fix the TypeScript warnings * Bring in the files related AMQP into `amqp` directory * Add `--nodefrag` flag to the tapper and bring in the main AMQP code * Implement the AMQP `BasicPublish` and fix some issues in the UI when the response payload is missing * Implement `representBasicPublish` method * Fix several minor issues * Implement the AMQP `BasicDeliver` * Implement the AMQP `QueueDeclare` * Implement the AMQP `ExchangeDeclare` * Implement the AMQP `ConnectionStart` * Implement the AMQP `ConnectionClose` * Implement the AMQP `QueueBind` * Implement the AMQP `BasicConsume` * Fix an issue in `ConnectionStart` * Fix a linter error * Bring in the files related Kafka into `kafka` directory * Fix the build errors in Kafka Go files * Implement `Dissect` method of Kafka and adapt request-response pair matcher to asynchronous client-server stream * Do the "Is reversed?" checked inside `getStreamProps` and fix an issue in Kafka `Dissect` method * Implement `Analyze`, `Summarize` methods of Kafka * Implement the representations for Kafka `Metadata`, `RequestHeader` and `ResponseHeader` * Refactor the AMQP and Kafka implementations to create the summary string only inside the `Analyze` method * Implement the representations for Kafka `ApiVersions` * Implement the representations for Kafka `Produce` * Implement the representations for Kafka `Fetch` * Implement the representations for Kafka `ListOffsets`, `CreateTopics` and `DeleteTopics` * Fix the encoding of AMQP `BasicPublish` and `BasicDeliver` body * Remove the unnecessary logging * Remove more logging * Introduce `Version` field to `Protocol` struct for dynamically switching the HTTP protocol to HTTP/2 * Fix the issues in analysis and representation of HTTP/2 (gRPC) protocol * Fix the issues in summary section of details layout for HTTP/2 (gRPC) protocol * Fix the read errors that freezes the sniffer in HTTP and Kafka * Fix the issues in HTTP POST data * Fix one more issue in HTTP POST data * Fix an infinite loop in Kafka * Fix another freezing issue in Kafka * Revert "UI Infra - Support multiple entry types + refactoring (#211)" This reverts commitf74a52d4dc
. * Fix more issues that are introduced by the merge * Fix the status code in the summary section * adding the cleaner again (why we removed it?). add TODO: on the extension loop . * fix dockerfile (remove deleting .env file) - it is found in dockerignore and fails to build if the file not exists * fix GetEntrties ("/entries" endpoint) - working with "tapApi.BaseEntryDetail" (moved from shared) * Fix an issue in the UI summary section * Refactor the protocol payload structs * Fix a log message in the passive tapper * Adapt `APP_PORTS` environment variable to the new extension system and change its format to `APP_PORTS='{"http": ["8001"]}' ` * Revert "fix dockerfile (remove deleting .env file) - it is found in dockerignore and fails to build if the file not exists" This reverts commit4f514ae1f4
. * Bring in the necessary changes fromf74a52d4dc
* Open the API server URL in the web browser as soon as Mizu is ready * Make the TCP reader consists of a single Go routine (instead of two) and try to dissect in both client and server mode by rewinding * Swap `TcpID` without overwriting it * Sort extension by priority * Try to dissect with looping through all the extensions * fix getStreamProps function. (it should be passed from CLI as it was before). * Turn TCP reader back into two Goroutines (client and server) * typo * Learn `isClient` from the TCP stream * Set `viewer` style `overflow: "auto"` * Fix the memory leaks in AMQP and Kafka dissectors * Revert some of the changes inbe7c65eb6d
* Remove `allExtensionPorts` since it's no longer needed * Remove `APP_PORTS` since it's no longer needed * Fix all of the minor issues in the React code * Check Kafka header size and fail-fast * Break the dissectors loop upon a successful dissection * Don't break the dissector loop. Protocols might collide * Improve the HTTP request-response counter (still not perfect) * Make the HTTP request-response counter perfect * Revert "Revert some of the changes in be7c65eb6d3fb657a059707da3ca559937e59739" This reverts commit08e7d786d8
. * Bring back `filterItems` and `isHealthCheckByUserAgent` functions * Remove some development artifacts * remove unused and commented lines that are not relevant * Fix the performance in TCP stream factory. Make it create two `tcpReader`(s) per extension * Change a log to debug * Make `*api.CounterPair` a field of `tcpReader` * Set `isTapTarget` to always `true` again since `filterAuthorities` implementation has problems * Remove a variable that's only used for logging even though not introduced by this branch * Bring back the `NumberOfRules` field of `ApplicableRules` struct * Remove the unused `NewEntry` function * Move `k8sResolver == nil` check to a more appropriate place * default healthChecksUserAgentHeaders should be empty array (like the default config value) * remove spam console.log * Rules button cause app to crash (access the service via incorrect property) * Ignore all .env* files in docker build. * Better caching in dockerfile: only copy go.mod before go mod download. * Check for errors while loading an extension * Add a comment about why `Protocol` is not a pointer * Bring back the call to `deleteOlderThan` * Remove the `nil` check * Reduce the maximum allowed AMQP message from 128MB to 1MB * Fix an error that only occurs when a Kafka broker is initiating * Revert the change inb2abd7b990
* Fix the service name resolution in all protocols * Remove the `anydirection` flag and fix the issue in `filterAuthorities` * Pass `sync.Map` by reference to `deleteOlderThan` method * Fix the packet capture issue in standalone mode that's introduced by the removal of `anydirection` * Temporarily resolve the memory exhaustion in AMQP * Fix a nil pointer dereference error * Fix the CLI build error * Fix a memory leak that's identified by `pprof` Co-authored-by: Roee Gadot <roee.gadot@up9.com> Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
571 lines
21 KiB
Go
571 lines
21 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/cli/apiserver"
|
|
"github.com/up9inc/mizu/cli/config"
|
|
"github.com/up9inc/mizu/cli/config/configStructs"
|
|
"github.com/up9inc/mizu/cli/errormessage"
|
|
"github.com/up9inc/mizu/cli/kubernetes"
|
|
"github.com/up9inc/mizu/cli/logger"
|
|
"github.com/up9inc/mizu/cli/mizu"
|
|
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
|
"github.com/up9inc/mizu/cli/mizu/goUtils"
|
|
"github.com/up9inc/mizu/cli/telemetry"
|
|
"github.com/up9inc/mizu/cli/uiUtils"
|
|
"github.com/up9inc/mizu/shared"
|
|
"github.com/up9inc/mizu/shared/debounce"
|
|
yaml "gopkg.in/yaml.v3"
|
|
core "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
)
|
|
|
|
const (
|
|
cleanupTimeout = time.Minute
|
|
updateTappersDelay = 5 * time.Second
|
|
)
|
|
|
|
type tapState struct {
|
|
apiServerService *core.Service
|
|
currentlyTappedPods []core.Pod
|
|
mizuServiceAccountExists bool
|
|
doNotRemoveConfigMap bool
|
|
}
|
|
|
|
var state tapState
|
|
|
|
func RunMizuTap() {
|
|
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
var mizuValidationRules string
|
|
if config.Config.Tap.EnforcePolicyFile != "" {
|
|
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
}
|
|
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
|
|
if err != nil {
|
|
logger.Log.Error(err)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel() // cancel will be called when this function exits
|
|
|
|
targetNamespaces := getNamespaces(kubernetesProvider)
|
|
|
|
if config.Config.IsNsRestrictedMode() {
|
|
if len(targetNamespaces) != 1 || !mizu.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) {
|
|
logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
|
|
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName)
|
|
return
|
|
}
|
|
}
|
|
|
|
var namespacesStr string
|
|
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
|
|
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \""))
|
|
} else {
|
|
namespacesStr = "all namespaces"
|
|
}
|
|
|
|
logger.Log.Infof("Tapping pods in %s", namespacesStr)
|
|
|
|
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
if len(state.currentlyTappedPods) == 0 {
|
|
var suggestionStr string
|
|
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
|
|
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
|
|
}
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
|
|
}
|
|
|
|
if config.Config.Tap.DryRun {
|
|
return
|
|
}
|
|
|
|
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
|
|
|
defer finishMizuExecution(kubernetesProvider)
|
|
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
|
|
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel)
|
|
|
|
//block until exit signal or error
|
|
waitForFinish(ctx, cancel)
|
|
}
|
|
|
|
func readValidationRules(file string) (string, error) {
|
|
rules, err := shared.DecodeEnforcePolicy(file)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
newContent, _ := yaml.Marshal(&rules)
|
|
return string(newContent), nil
|
|
}
|
|
|
|
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := createMizuApiServer(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
|
|
state.doNotRemoveConfigMap = true
|
|
} else if mizuValidationRules == "" {
|
|
state.doNotRemoveConfigMap = true
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, data string) error {
|
|
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName, data)
|
|
return err
|
|
}
|
|
|
|
func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
|
|
_, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace)
|
|
return err
|
|
}
|
|
|
|
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
|
var err error
|
|
|
|
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
|
|
if err != nil {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to ensure the resources required for IP resolving. Mizu will not resolve target IPs to names. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
|
|
var serviceAccountName string
|
|
if state.mizuServiceAccountExists {
|
|
serviceAccountName = mizu.ServiceAccountName
|
|
} else {
|
|
serviceAccountName = ""
|
|
}
|
|
|
|
opts := &kubernetes.ApiServerOptions{
|
|
Namespace: config.Config.MizuResourcesNamespace,
|
|
PodName: mizu.ApiServerPodName,
|
|
PodImage: config.Config.AgentImage,
|
|
ServiceAccountName: serviceAccountName,
|
|
IsNamespaceRestricted: config.Config.IsNsRestrictedMode(),
|
|
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
|
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
|
|
Resources: config.Config.Tap.ApiServerResources,
|
|
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
|
}
|
|
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
|
|
|
|
state.apiServerService, err = kubernetesProvider.CreateService(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
|
|
var compiledRegexSlice []*shared.SerializableRegexp
|
|
|
|
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
|
|
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
|
|
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
|
|
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
|
|
}
|
|
}
|
|
|
|
return &shared.TrafficFilteringOptions{
|
|
PlainTextMaskingRegexes: compiledRegexSlice,
|
|
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
|
|
DisableRedaction: config.Config.Tap.DisableRedaction,
|
|
}, nil
|
|
}
|
|
|
|
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string) error {
|
|
if len(nodeToTappedPodIPMap) > 0 {
|
|
var serviceAccountName string
|
|
if state.mizuServiceAccountExists {
|
|
serviceAccountName = mizu.ServiceAccountName
|
|
} else {
|
|
serviceAccountName = ""
|
|
}
|
|
|
|
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
|
|
ctx,
|
|
config.Config.MizuResourcesNamespace,
|
|
mizu.TapperDaemonSetName,
|
|
config.Config.AgentImage,
|
|
mizu.TapperPodName,
|
|
fmt.Sprintf("%s.%s.svc.cluster.local", state.apiServerService.Name, state.apiServerService.Namespace),
|
|
nodeToTappedPodIPMap,
|
|
serviceAccountName,
|
|
config.Config.Tap.TapperResources,
|
|
config.Config.ImagePullPolicy(),
|
|
); err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
|
|
} else {
|
|
if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func finishMizuExecution(kubernetesProvider *kubernetes.Provider) {
|
|
telemetry.ReportAPICalls()
|
|
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
|
|
defer cancel()
|
|
dumpLogsIfNeeded(kubernetesProvider, removalCtx)
|
|
cleanUpMizuResources(kubernetesProvider, removalCtx, cancel)
|
|
}
|
|
|
|
func dumpLogsIfNeeded(kubernetesProvider *kubernetes.Provider, removalCtx context.Context) {
|
|
if !config.Config.DumpLogs {
|
|
return
|
|
}
|
|
mizuDir := mizu.GetMizuFolderPath()
|
|
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
|
|
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
|
|
logger.Log.Errorf("Failed dump logs %v", err)
|
|
}
|
|
}
|
|
|
|
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider, removalCtx context.Context, cancel context.CancelFunc) {
|
|
logger.Log.Infof("\nRemoving mizu resources\n")
|
|
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := kubernetesProvider.RemoveNamespace(removalCtx, config.Config.MizuResourcesNamespace); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
} else {
|
|
if err := kubernetesProvider.RemovePod(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveService(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
|
|
if !state.doNotRemoveConfigMap {
|
|
if err := kubernetesProvider.RemoveConfigMap(removalCtx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if state.mizuServiceAccountExists {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
} else {
|
|
if err := kubernetesProvider.RemoveServicAccount(removalCtx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRole(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
}
|
|
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
waitUntilNamespaceDeleted(removalCtx, cancel, kubernetesProvider)
|
|
}
|
|
}
|
|
|
|
func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
|
|
// Call cancel if a terminating signal was received. Allows user to skip the wait.
|
|
go func() {
|
|
waitForFinish(ctx, cancel)
|
|
}()
|
|
|
|
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
|
switch {
|
|
case ctx.Err() == context.Canceled:
|
|
logger.Log.Debugf("Do nothing. User interrupted the wait")
|
|
case err == wait.ErrWaitTimeout:
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", config.Config.MizuResourcesNamespace))
|
|
default:
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, config.Config.Tap.PodRegex())
|
|
|
|
restartTappers := func() {
|
|
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Failed to update currently tapped pods: %v", err))
|
|
cancel()
|
|
}
|
|
|
|
if !changeFound {
|
|
logger.Log.Debugf("Nothing changed update tappers not needed")
|
|
return
|
|
}
|
|
|
|
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
}
|
|
|
|
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
|
|
cancel()
|
|
}
|
|
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
|
|
cancel()
|
|
}
|
|
}
|
|
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
|
|
|
|
for {
|
|
select {
|
|
case pod := <-added:
|
|
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
|
restartTappersDebouncer.SetOn()
|
|
case pod := <-removed:
|
|
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
|
restartTappersDebouncer.SetOn()
|
|
case pod := <-modified:
|
|
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
|
// Act only if the modified pod has already obtained an IP address.
|
|
// After filtering for IPs, on a normal pod restart this includes the following events:
|
|
// - Pod deletion
|
|
// - Pod reaches start state
|
|
// - Pod reaches ready state
|
|
// Ready/unready transitions might also trigger this event.
|
|
if pod.Status.PodIP != "" {
|
|
restartTappersDebouncer.SetOn()
|
|
}
|
|
|
|
case err := <-errorChan:
|
|
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
|
|
restartTappersDebouncer.Cancel()
|
|
// TODO: Does this also perform cleanup?
|
|
cancel()
|
|
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
|
|
restartTappersDebouncer.Cancel()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespaces []string) (error, bool) {
|
|
changeFound := false
|
|
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), targetNamespaces); err != nil {
|
|
return err, false
|
|
} else {
|
|
podsToTap := excludeMizuPods(matchingPods)
|
|
addedPods, removedPods := getPodArrayDiff(state.currentlyTappedPods, podsToTap)
|
|
for _, addedPod := range addedPods {
|
|
changeFound = true
|
|
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
|
|
}
|
|
for _, removedPod := range removedPods {
|
|
changeFound = true
|
|
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
|
|
}
|
|
state.currentlyTappedPods = podsToTap
|
|
}
|
|
|
|
return nil, changeFound
|
|
}
|
|
|
|
func excludeMizuPods(pods []core.Pod) []core.Pod {
|
|
mizuPrefixRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix)
|
|
|
|
nonMizuPods := make([]core.Pod, 0)
|
|
for _, pod := range pods {
|
|
if !mizuPrefixRegex.MatchString(pod.Name) {
|
|
nonMizuPods = append(nonMizuPods, pod)
|
|
}
|
|
}
|
|
|
|
return nonMizuPods
|
|
}
|
|
|
|
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
|
added = getMissingPods(newPods, oldPods)
|
|
removed = getMissingPods(oldPods, newPods)
|
|
|
|
return added, removed
|
|
}
|
|
|
|
//returns pods present in pods1 array and missing in pods2 array
|
|
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
|
missingPods := make([]core.Pod, 0)
|
|
for _, pod1 := range pods1 {
|
|
var found = false
|
|
for _, pod2 := range pods2 {
|
|
if pod1.UID == pod2.UID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
missingPods = append(missingPods, pod1)
|
|
}
|
|
}
|
|
return missingPods
|
|
}
|
|
|
|
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
|
|
isPodReady := false
|
|
timeAfter := time.After(25 * time.Second)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("Watching API Server pod loop, ctx done")
|
|
return
|
|
case <-added:
|
|
logger.Log.Debugf("Watching API Server pod loop, added")
|
|
continue
|
|
case <-removed:
|
|
logger.Log.Infof("%s removed", mizu.ApiServerPodName)
|
|
cancel()
|
|
return
|
|
case modifiedPod := <-modified:
|
|
if modifiedPod == nil {
|
|
logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
|
|
continue
|
|
}
|
|
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
|
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
|
isPodReady = true
|
|
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
|
|
|
url := GetApiServerUrl()
|
|
if err := apiserver.Provider.InitAndTestConnection(url); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
|
cancel()
|
|
break
|
|
}
|
|
logger.Log.Infof("Mizu is available at %s\n", url)
|
|
openBrowser(url)
|
|
requestForAnalysisIfNeeded()
|
|
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
}
|
|
}
|
|
case <-timeAfter:
|
|
if !isPodReady {
|
|
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
|
|
cancel()
|
|
}
|
|
case <-errorChan:
|
|
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
|
|
cancel()
|
|
}
|
|
}
|
|
}
|
|
|
|
func requestForAnalysisIfNeeded() {
|
|
if !config.Config.Tap.Analysis {
|
|
return
|
|
}
|
|
if err := apiserver.Provider.RequestAnalysis(config.Config.Tap.AnalysisDestination, config.Config.Tap.SleepIntervalSec); err != nil {
|
|
logger.Log.Debugf("[Error] failed requesting for analysis %v", err)
|
|
}
|
|
}
|
|
|
|
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
err := kubernetesProvider.CreateMizuRBAC(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.RoleName, mizu.RoleBindingName, mizu.RBACVersion)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func getNodeHostToTappedPodIpsMap(tappedPods []core.Pod) map[string][]string {
|
|
nodeToTappedPodIPMap := make(map[string][]string, 0)
|
|
for _, pod := range tappedPods {
|
|
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
|
|
if existingList == nil {
|
|
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string{pod.Status.PodIP}
|
|
} else {
|
|
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
|
|
}
|
|
}
|
|
return nodeToTappedPodIPMap
|
|
}
|
|
|
|
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
|
|
if config.Config.Tap.AllNamespaces {
|
|
return []string{mizu.K8sAllNamespaces}
|
|
} else if len(config.Config.Tap.Namespaces) > 0 {
|
|
return mizu.Unique(config.Config.Tap.Namespaces)
|
|
} else {
|
|
return []string{kubernetesProvider.CurrentNamespace()}
|
|
}
|
|
}
|