mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-22 22:53:04 +00:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c35a9f143 | ||
|
|
d6766a8ac5 | ||
|
|
eaa21f0789 | ||
|
|
ac6aee07f5 | ||
|
|
5b428c5636 | ||
|
|
894f97ca41 | ||
|
|
45f8c8a834 | ||
|
|
75ff80218b | ||
|
|
6255e1d4ef | ||
|
|
8ea606fb59 | ||
|
|
c9d41c53ca | ||
|
|
3995dae16d | ||
|
|
447fcf1b31 | ||
|
|
7e7fa772e1 |
92
README.md
92
README.md
@@ -23,87 +23,61 @@
|
||||
</a>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
<p align="center">
|
||||
Mizu (by UP9) is now Kubeshark, read more about it <a href="https://www.kubeshark.co/mizu-is-now-kubeshark">here</a>.
|
||||
<b>
|
||||
<a href="https://github.com/kubeshark/kubeshark/releases/tag/38.1">Version 38.1</a> is out with <a href="https://docs.kubeshark.co/en/pcap_export_import">PCAP export/import</a>, <a href="https://docs.kubeshark.co/en/tcp_streams">TCP streams</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
|
||||
</b>
|
||||
</p>
|
||||
|
||||
Kubeshark, the API Traffic Viewer for kubernetes, provides deep visibility and monitoring of all API traffic and payloads going in, out and across containers and pods inside a Kubernetes cluster.
|
||||
|
||||
Think of a combination of Chrome Dev Tools, TCPDump and Wireshark, re-invented for Kubernetes.
|
||||
**Kubeshark** is an API Traffic Viewer for [**Kubernetes**](https://kubernetes.io/) providing deep visibility and monitoring of all API traffic and payloads going in, out and across containers and Pods inside a Kubernetes cluster.
|
||||
|
||||

|
||||
|
||||
## Download
|
||||
Think [TCPDump](https://en.wikipedia.org/wiki/Tcpdump) and [Wireshark](https://www.wireshark.org/) re-invented for Kubernetes
|
||||
|
||||
Kubeshark uses a ~45MB pre-compiled executable binary to communicate with the Kubernetes API. We recommend downloading the `kubeshark` CLI by using one of these options:
|
||||
## Getting Started
|
||||
|
||||
- Choose the right binary, download and use directly from [the latest stable release](https://github.com/kubeshark/kubeshark/releases/latest).
|
||||
|
||||
- Use the shell script below :point_down: to automatically download the right binary for your operating system and CPU architecture:
|
||||
|
||||
```shell
|
||||
sh <(curl -Ls https://kubeshark.co/install)
|
||||
```
|
||||
|
||||
- Compile it from source using `make` command then use `./bin/kubeshark__` executable.
|
||||
|
||||
## Run
|
||||
|
||||
Use the `kubeshark` CLI to capture and view streaming API traffic in real time.
|
||||
Download **Kubeshark**'s binary distribution [latest release](https://github.com/kubeshark/kubeshark/releases/latest) and run following one of these examples:
|
||||
|
||||
```shell
|
||||
kubeshark tap
|
||||
```
|
||||
|
||||
### Troubleshooting Installation
|
||||
If something doesn't work or simply to play it safe prior to installing;
|
||||
|
||||
> Make sure you have access to https://hub.docker.com/
|
||||
|
||||
> Make sure `kubeshark` executable in your `PATH`.
|
||||
|
||||
### Select Pods
|
||||
|
||||
#### Monitoring a Specific Pod:
|
||||
|
||||
```shell
|
||||
kubeshark tap catalogue-b87b45784-sxc8q
|
||||
```
|
||||
|
||||
#### Monitoring a Set of Pods Using Regex:
|
||||
|
||||
```shell
|
||||
kubeshark tap "(catalo*|front-end*)"
|
||||
```
|
||||
|
||||
### Specify the Namespace
|
||||
|
||||
By default, Kubeshark targets the `default` namespace.
|
||||
To specify a different namespace:
|
||||
|
||||
```
|
||||
kubeshark tap -n sock-shop
|
||||
```
|
||||
|
||||
### Specify All Namespaces
|
||||
|
||||
The default strategy of Kubeshark waits for the new pods
|
||||
to be created. To simply tap all existing namespaces run:
|
||||
|
||||
```
|
||||
kubeshark tap -A
|
||||
```
|
||||
|
||||
```shell
|
||||
kubeshark tap -n sock-shop "(catalo*|front-end*)"
|
||||
```
|
||||
|
||||
Running any of the :point_up: above commands will open the [Web UI](https://docs.kubeshark.co/en/ui) in your browser which streams the traffic in your Kubernetes cluster in real-time.
|
||||
|
||||
### Homebrew
|
||||
|
||||
[Homebrew](https://brew.sh/) :beer: users can add Kubeshark formulae with:
|
||||
|
||||
```shell
|
||||
brew tap kubeshark/kubeshark
|
||||
```
|
||||
|
||||
and install Kubeshark CLI with:
|
||||
|
||||
```shell
|
||||
brew install kubeshark
|
||||
```
|
||||
|
||||
## Building From Source
|
||||
|
||||
Clone this repository and run `make` command to build it. After the build is complete, the executable can be found at `./bin/kubeshark__`.
|
||||
|
||||
## Documentation
|
||||
|
||||
Visit our documentation website: [docs.kubeshark.co](https://docs.kubeshark.co)
|
||||
|
||||
The documentation resources are open-source and can be found on GitHub: [kubeshark/docs](https://github.com/kubeshark/docs)
|
||||
To learn more, read the [documentation](https://docs.kubeshark.co).
|
||||
|
||||
## Contributing
|
||||
|
||||
We ❤️ pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
|
||||
We :heart: pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
|
||||
|
||||
## Code of Conduct
|
||||
|
||||
|
||||
@@ -46,6 +46,8 @@ func init() {
|
||||
|
||||
tapCmd.Flags().StringP(configStructs.DockerRegistryLabel, "r", defaultTapConfig.Docker.Registry, "The Docker registry that's hosting the images.")
|
||||
tapCmd.Flags().StringP(configStructs.DockerTagLabel, "t", defaultTapConfig.Docker.Tag, "The tag of the Docker images that are going to be pulled.")
|
||||
tapCmd.Flags().String(configStructs.DockerImagePullPolicy, defaultTapConfig.Docker.ImagePullPolicy, "ImagePullPolicy for the Docker images.")
|
||||
tapCmd.Flags().StringSlice(configStructs.DockerImagePullSecrets, defaultTapConfig.Docker.ImagePullSecrets, "ImagePullSecrets for the Docker images.")
|
||||
tapCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward.")
|
||||
tapCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward.")
|
||||
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")
|
||||
|
||||
@@ -71,9 +71,9 @@ func tap() {
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:")
|
||||
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:")
|
||||
|
||||
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
|
||||
if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
|
||||
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func tap() {
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Waiting for the creation of %s resources...", misc.Software))
|
||||
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil {
|
||||
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.ImagePullSecrets(), config.Config.Tap.Debug); err != nil {
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
|
||||
log.Warn().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance", misc.Software, misc.Program, misc.Software))
|
||||
@@ -113,15 +113,15 @@ This function is a bit problematic as it might be detached from the actual pods
|
||||
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
|
||||
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
|
||||
*/
|
||||
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
||||
func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
||||
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
|
||||
return err
|
||||
} else {
|
||||
if len(matchingPods) == 0 {
|
||||
printNoPodsFoundSuggestion(namespaces)
|
||||
}
|
||||
for _, targettedPod := range matchingPods {
|
||||
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name)))
|
||||
for _, targetedPod := range matchingPods {
|
||||
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -134,6 +134,7 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
|
||||
SelfNamespace: config.Config.SelfNamespace,
|
||||
WorkerResources: config.Config.Tap.Resources.Worker,
|
||||
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
||||
ImagePullSecrets: config.Config.ImagePullSecrets(),
|
||||
SelfServiceAccountExists: state.selfServiceAccountExists,
|
||||
ServiceMesh: config.Config.Tap.ServiceMesh,
|
||||
Tls: config.Config.Tap.Tls,
|
||||
@@ -159,7 +160,7 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
|
||||
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
|
||||
return
|
||||
}
|
||||
go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods)
|
||||
go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
|
||||
case pod, ok := <-workerSyncer.WorkerPodsChanges:
|
||||
if !ok {
|
||||
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
|
||||
@@ -187,7 +188,7 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
|
||||
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
|
||||
switch err.TapManagerReason {
|
||||
case kubernetes.TapManagerPodListError:
|
||||
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError)
|
||||
return fmt.Sprintf("Failed to update currently targeted pods: %v", err.OriginalError)
|
||||
case kubernetes.TapManagerPodWatchError:
|
||||
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
|
||||
case kubernetes.TapManagerWorkerUpdateError:
|
||||
|
||||
@@ -45,6 +45,15 @@ func (config *ConfigStruct) ImagePullPolicy() v1.PullPolicy {
|
||||
return v1.PullPolicy(config.Tap.Docker.ImagePullPolicy)
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) ImagePullSecrets() []v1.LocalObjectReference {
|
||||
var ref []v1.LocalObjectReference
|
||||
for _, name := range config.Tap.Docker.ImagePullSecrets {
|
||||
ref = append(ref, v1.LocalObjectReference{Name: name})
|
||||
}
|
||||
|
||||
return ref
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) IsNsRestrictedMode() bool {
|
||||
return config.SelfNamespace != misc.Program // Notice "kubeshark" string must match the default SelfNamespace
|
||||
}
|
||||
|
||||
@@ -10,34 +10,36 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DockerRegistryLabel = "docker-registry"
|
||||
DockerTagLabel = "docker-tag"
|
||||
ProxyFrontPortLabel = "proxy-front-port"
|
||||
ProxyHubPortLabel = "proxy-hub-port"
|
||||
ProxyHostLabel = "proxy-host"
|
||||
NamespacesLabel = "namespaces"
|
||||
AllNamespacesLabel = "allnamespaces"
|
||||
StorageLimitLabel = "storagelimit"
|
||||
DryRunLabel = "dryrun"
|
||||
PcapLabel = "pcap"
|
||||
ServiceMeshLabel = "servicemesh"
|
||||
TlsLabel = "tls"
|
||||
DebugLabel = "debug"
|
||||
DockerRegistryLabel = "docker-registry"
|
||||
DockerTagLabel = "docker-tag"
|
||||
DockerImagePullPolicy = "docker-imagepullpolicy"
|
||||
DockerImagePullSecrets = "docker-imagepullsecrets"
|
||||
ProxyFrontPortLabel = "proxy-front-port"
|
||||
ProxyHubPortLabel = "proxy-hub-port"
|
||||
ProxyHostLabel = "proxy-host"
|
||||
NamespacesLabel = "namespaces"
|
||||
AllNamespacesLabel = "allnamespaces"
|
||||
StorageLimitLabel = "storagelimit"
|
||||
DryRunLabel = "dryrun"
|
||||
PcapLabel = "pcap"
|
||||
ServiceMeshLabel = "servicemesh"
|
||||
TlsLabel = "tls"
|
||||
DebugLabel = "debug"
|
||||
)
|
||||
|
||||
type WorkerConfig struct {
|
||||
SrcPort uint16 `yaml:"src-port" default:"8897"`
|
||||
DstPort uint16 `yaml:"dst-port" default:"8897"`
|
||||
SrcPort uint16 `yaml:"port" default:"8897"`
|
||||
DstPort uint16 `yaml:"srvport" default:"8897"`
|
||||
}
|
||||
|
||||
type HubConfig struct {
|
||||
SrcPort uint16 `yaml:"src-port" default:"8898"`
|
||||
DstPort uint16 `yaml:"dst-port" default:"8898"`
|
||||
SrcPort uint16 `yaml:"port" default:"8898"`
|
||||
DstPort uint16 `yaml:"srvport" default:"8898"`
|
||||
}
|
||||
|
||||
type FrontConfig struct {
|
||||
SrcPort uint16 `yaml:"src-port" default:"8899"`
|
||||
DstPort uint16 `yaml:"dst-port" default:"80"`
|
||||
SrcPort uint16 `yaml:"port" default:"8899"`
|
||||
DstPort uint16 `yaml:"srvport" default:"80"`
|
||||
}
|
||||
|
||||
type ProxyConfig struct {
|
||||
@@ -48,9 +50,10 @@ type ProxyConfig struct {
|
||||
}
|
||||
|
||||
type DockerConfig struct {
|
||||
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
|
||||
Tag string `yaml:"tag" default:"latest"`
|
||||
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
|
||||
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
|
||||
Tag string `yaml:"tag" default:"latest"`
|
||||
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
|
||||
ImagePullSecrets []string `yaml:"imagepullsecrets"`
|
||||
}
|
||||
|
||||
type ResourcesConfig struct {
|
||||
|
||||
@@ -18,7 +18,7 @@ func FormatError(err error) error {
|
||||
if k8serrors.IsForbidden(err) {
|
||||
errorNew = fmt.Errorf("insufficient permissions: %w. "+
|
||||
"supply the required permission or control %s's access to namespaces by setting %s "+
|
||||
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>",
|
||||
"in the config file or setting the targeted namespace with --%s %s=<NAMEPSACE>",
|
||||
err,
|
||||
misc.Software,
|
||||
config.SelfNamespaceConfigName,
|
||||
|
||||
2
go.mod
2
go.mod
@@ -8,7 +8,7 @@ require (
|
||||
github.com/docker/go-connections v0.4.0
|
||||
github.com/docker/go-units v0.4.0
|
||||
github.com/google/go-github/v37 v37.0.0
|
||||
github.com/kubeshark/base v0.5.0
|
||||
github.com/kubeshark/base v0.5.1
|
||||
github.com/rs/zerolog v1.28.0
|
||||
github.com/spf13/cobra v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
|
||||
4
go.sum
4
go.sum
@@ -414,8 +414,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0=
|
||||
github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
|
||||
github.com/kubeshark/base v0.5.1 h1:msy1iQLgWQK1COoicwWxEDbeXU9J5RuptA5fYeOEzfA=
|
||||
github.com/kubeshark/base v0.5.1/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
|
||||
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
|
||||
|
||||
@@ -116,22 +116,22 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) {
|
||||
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url)
|
||||
func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) {
|
||||
postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url)
|
||||
|
||||
if podsMarshalled, err := json.Marshal(pods); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal the targetted pods:")
|
||||
log.Error().Err(err).Msg("Failed to marshal the targeted pods:")
|
||||
} else {
|
||||
ok := false
|
||||
for !ok {
|
||||
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
|
||||
if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
|
||||
if _, ok := err.(*url.Error); ok {
|
||||
break
|
||||
}
|
||||
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:")
|
||||
log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:")
|
||||
} else {
|
||||
ok = true
|
||||
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:")
|
||||
log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:")
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
@@ -177,6 +177,7 @@ type PodOptions struct {
|
||||
ServiceAccountName string
|
||||
Resources Resources
|
||||
ImagePullPolicy core.PullPolicy
|
||||
ImagePullSecrets []core.LocalObjectReference
|
||||
Debug bool
|
||||
}
|
||||
|
||||
@@ -251,6 +252,7 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
|
||||
Effect: core.TaintEffectNoSchedule,
|
||||
},
|
||||
},
|
||||
ImagePullSecrets: opts.ImagePullSecrets,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -353,6 +355,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
|
||||
Effect: core.TaintEffectNoSchedule,
|
||||
},
|
||||
},
|
||||
ImagePullSecrets: opts.ImagePullSecrets,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -664,6 +667,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
|
||||
serviceAccountName string,
|
||||
resources Resources,
|
||||
imagePullPolicy core.PullPolicy,
|
||||
imagePullSecrets []core.LocalObjectReference,
|
||||
serviceMesh bool,
|
||||
tls bool,
|
||||
debug bool,
|
||||
@@ -812,6 +816,14 @@ func (provider *Provider) ApplyWorkerDaemonSet(
|
||||
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
|
||||
podSpec.WithVolumes(procfsVolume, sysfsVolume)
|
||||
|
||||
if len(imagePullSecrets) > 0 {
|
||||
localObjectReference := applyconfcore.LocalObjectReference()
|
||||
for _, secret := range imagePullSecrets {
|
||||
localObjectReference.WithName(secret.Name)
|
||||
}
|
||||
podSpec.WithImagePullSecrets(localObjectReference)
|
||||
}
|
||||
|
||||
podTemplate := applyconfcore.PodTemplateSpec()
|
||||
podTemplate.WithLabels(map[string]string{
|
||||
"app": workerPodName,
|
||||
|
||||
@@ -8,19 +8,19 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap {
|
||||
nodeToTargettedPodsMap := make(models.NodeToPodsMap)
|
||||
for _, pod := range targettedPods {
|
||||
func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap {
|
||||
nodeToTargetedPodsMap := make(models.NodeToPodsMap)
|
||||
for _, pod := range targetedPods {
|
||||
minimizedPod := getMinimizedPod(pod)
|
||||
|
||||
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName]
|
||||
existingList := nodeToTargetedPodsMap[pod.Spec.NodeName]
|
||||
if existingList == nil {
|
||||
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
|
||||
nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
|
||||
} else {
|
||||
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod)
|
||||
nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod)
|
||||
}
|
||||
}
|
||||
return nodeToTargettedPodsMap
|
||||
return nodeToTargetedPodsMap
|
||||
}
|
||||
|
||||
func getMinimizedPod(fullPod core.Pod) core.Pod {
|
||||
|
||||
@@ -17,23 +17,23 @@ import (
|
||||
|
||||
const updateWorkersDelay = 5 * time.Second
|
||||
|
||||
type TargettedPodChangeEvent struct {
|
||||
type TargetedPodChangeEvent struct {
|
||||
Added []v1.Pod
|
||||
Removed []v1.Pod
|
||||
}
|
||||
|
||||
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
|
||||
type WorkerSyncer struct {
|
||||
startTime time.Time
|
||||
context context.Context
|
||||
CurrentlyTargettedPods []v1.Pod
|
||||
config WorkerSyncerConfig
|
||||
kubernetesProvider *Provider
|
||||
TapPodChangesOut chan TargettedPodChangeEvent
|
||||
WorkerPodsChanges chan *v1.Pod
|
||||
ErrorOut chan K8sTapManagerError
|
||||
nodeToTargettedPodMap models.NodeToPodsMap
|
||||
targettedNodes []string
|
||||
startTime time.Time
|
||||
context context.Context
|
||||
CurrentlyTargetedPods []v1.Pod
|
||||
config WorkerSyncerConfig
|
||||
kubernetesProvider *Provider
|
||||
TapPodChangesOut chan TargetedPodChangeEvent
|
||||
WorkerPodsChanges chan *v1.Pod
|
||||
ErrorOut chan K8sTapManagerError
|
||||
nodeToTargetedPodMap models.NodeToPodsMap
|
||||
targetedNodes []string
|
||||
}
|
||||
|
||||
type WorkerSyncerConfig struct {
|
||||
@@ -42,6 +42,7 @@ type WorkerSyncerConfig struct {
|
||||
SelfNamespace string
|
||||
WorkerResources Resources
|
||||
ImagePullPolicy v1.PullPolicy
|
||||
ImagePullSecrets []v1.LocalObjectReference
|
||||
SelfServiceAccountExists bool
|
||||
ServiceMesh bool
|
||||
Tls bool
|
||||
@@ -50,17 +51,17 @@ type WorkerSyncerConfig struct {
|
||||
|
||||
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
|
||||
syncer := &WorkerSyncer{
|
||||
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
|
||||
context: ctx,
|
||||
CurrentlyTargettedPods: make([]v1.Pod, 0),
|
||||
config: config,
|
||||
kubernetesProvider: kubernetesProvider,
|
||||
TapPodChangesOut: make(chan TargettedPodChangeEvent, 100),
|
||||
WorkerPodsChanges: make(chan *v1.Pod, 100),
|
||||
ErrorOut: make(chan K8sTapManagerError, 100),
|
||||
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
|
||||
context: ctx,
|
||||
CurrentlyTargetedPods: make([]v1.Pod, 0),
|
||||
config: config,
|
||||
kubernetesProvider: kubernetesProvider,
|
||||
TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
|
||||
WorkerPodsChanges: make(chan *v1.Pod, 100),
|
||||
ErrorOut: make(chan K8sTapManagerError, 100),
|
||||
}
|
||||
|
||||
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil {
|
||||
if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -68,7 +69,7 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go syncer.watchPodsForTargetting()
|
||||
go syncer.watchPodsForTargeting()
|
||||
go syncer.watchWorkerEvents()
|
||||
go syncer.watchWorkerPods()
|
||||
return syncer, nil
|
||||
@@ -178,12 +179,12 @@ func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() {
|
||||
func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
|
||||
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
|
||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
|
||||
|
||||
handleChangeInPods := func() {
|
||||
err, changeFound := workerSyncer.updateCurrentlyTargettedPods()
|
||||
err, changeFound := workerSyncer.updateCurrentlyTargetedPods()
|
||||
if err != nil {
|
||||
workerSyncer.ErrorOut <- K8sTapManagerError{
|
||||
OriginalError: err,
|
||||
@@ -298,22 +299,22 @@ func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorke
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) {
|
||||
func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) {
|
||||
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
|
||||
return err, false
|
||||
} else {
|
||||
podsToTarget := excludeSelfPods(matchingPods)
|
||||
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget)
|
||||
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget)
|
||||
for _, addedPod := range addedPods {
|
||||
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:")
|
||||
log.Info().Str("pod", addedPod.Name).Msg("Currently targeting:")
|
||||
}
|
||||
for _, removedPod := range removedPods {
|
||||
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.")
|
||||
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targeting is stopped.")
|
||||
}
|
||||
if len(addedPods) > 0 || len(removedPods) > 0 {
|
||||
workerSyncer.CurrentlyTargettedPods = podsToTarget
|
||||
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods)
|
||||
workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{
|
||||
workerSyncer.CurrentlyTargetedPods = podsToTarget
|
||||
workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods)
|
||||
workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{
|
||||
Added: addedPods,
|
||||
Removed: removedPods,
|
||||
}
|
||||
@@ -324,14 +325,14 @@ func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, cha
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) updateWorkers() error {
|
||||
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap))
|
||||
nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap))
|
||||
i := 0
|
||||
for node := range workerSyncer.nodeToTargettedPodMap {
|
||||
for node := range workerSyncer.nodeToTargetedPodMap {
|
||||
nodesToTarget[i] = node
|
||||
i++
|
||||
}
|
||||
|
||||
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) {
|
||||
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) {
|
||||
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
|
||||
return nil
|
||||
}
|
||||
@@ -340,7 +341,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
|
||||
|
||||
image := docker.GetWorkerImage()
|
||||
|
||||
if len(workerSyncer.nodeToTargettedPodMap) > 0 {
|
||||
if len(workerSyncer.nodeToTargetedPodMap) > 0 {
|
||||
var serviceAccountName string
|
||||
if workerSyncer.config.SelfServiceAccountExists {
|
||||
serviceAccountName = ServiceAccountName
|
||||
@@ -348,8 +349,8 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
|
||||
serviceAccountName = ""
|
||||
}
|
||||
|
||||
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap))
|
||||
for nodeName := range workerSyncer.nodeToTargettedPodMap {
|
||||
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap))
|
||||
for nodeName := range workerSyncer.nodeToTargetedPodMap {
|
||||
nodeNames = append(nodeNames, nodeName)
|
||||
}
|
||||
|
||||
@@ -363,13 +364,14 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
|
||||
serviceAccountName,
|
||||
workerSyncer.config.WorkerResources,
|
||||
workerSyncer.config.ImagePullPolicy,
|
||||
workerSyncer.config.ImagePullSecrets,
|
||||
workerSyncer.config.ServiceMesh,
|
||||
workerSyncer.config.Tls,
|
||||
workerSyncer.config.Debug); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.")
|
||||
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.")
|
||||
} else {
|
||||
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
|
||||
workerSyncer.context,
|
||||
@@ -383,7 +385,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
|
||||
log.Debug().Msg("Successfully resetted Worker DaemonSet")
|
||||
}
|
||||
|
||||
workerSyncer.targettedNodes = nodesToTarget
|
||||
workerSyncer.targetedNodes = nodesToTarget
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
15
kubeshark.go
15
kubeshark.go
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/cmd"
|
||||
@@ -11,6 +12,20 @@ import (
|
||||
|
||||
func main() {
|
||||
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
||||
|
||||
// Short caller (file:line)
|
||||
zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string {
|
||||
short := file
|
||||
for i := len(file) - 1; i > 0; i-- {
|
||||
if file[i] == '/' {
|
||||
short = file[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
file = short
|
||||
return file + ":" + strconv.Itoa(line)
|
||||
}
|
||||
|
||||
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Caller().Logger()
|
||||
cmd.Execute()
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
core "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) {
|
||||
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, imagePullSecrets []core.LocalObjectReference, debug bool) (bool, error) {
|
||||
if !isNsRestrictedMode {
|
||||
if err := createSelfNamespace(ctx, kubernetesProvider, selfNamespace); err != nil {
|
||||
return false, err
|
||||
@@ -39,6 +39,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
|
||||
ServiceAccountName: serviceAccountName,
|
||||
Resources: hubResources,
|
||||
ImagePullPolicy: imagePullPolicy,
|
||||
ImagePullSecrets: imagePullSecrets,
|
||||
Debug: debug,
|
||||
}
|
||||
|
||||
@@ -49,6 +50,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
|
||||
ServiceAccountName: serviceAccountName,
|
||||
Resources: hubResources,
|
||||
ImagePullPolicy: imagePullPolicy,
|
||||
ImagePullSecrets: imagePullSecrets,
|
||||
Debug: debug,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user