Compare commits

...

14 Commits
38.0 ... 38.2

Author SHA1 Message Date
M. Mert Yildiran
1c35a9f143 🐛 Fix the parsing of --proxy-front-port and --proxy-hub-port options 2023-01-15 02:31:47 +03:00
M. Mert Yildiran
d6766a8ac5 🐛 Fix .spec.template.spec.imagePullSecrets: element 0: associative list with keys has an element that omits key field \"name\" error 2023-01-08 21:47:00 +03:00
M. Mert Yildiran
eaa21f0789 🔨 Fix the targetted typo 2023-01-08 21:44:12 +03:00
M. Mert Yildiran
ac6aee07f5 Update README.md 2023-01-07 14:36:26 +03:00
M. Mert Yildiran
5b428c5636 Update README.md 2023-01-07 14:35:12 +03:00
M. Mert Yildiran
894f97ca41 Add an option to set the ImagePullSecrets 2023-01-07 14:20:01 +03:00
Atıl Sensalduz
45f8c8a834 docs: add how to install step for brew (#1286) 2023-01-03 15:14:38 -08:00
Alon Girmonsky
75ff80218b Update README.md 2022-12-31 14:14:19 -08:00
Alon Girmonsky
6255e1d4ef Update README.md 2022-12-31 14:10:32 -08:00
Alon Girmonsky
8ea606fb59 Update README.md 2022-12-31 10:15:05 -08:00
Alon Girmonsky
c9d41c53ca Update README.md 2022-12-30 21:45:13 -08:00
Alon Girmonsky
3995dae16d Update README.md 2022-12-30 21:41:56 -08:00
Alon Girmonsky
447fcf1b31 Change the center message
From Mizu to Distributed PCAP storage
2022-12-30 21:39:29 -08:00
M. Mert Yildiran
7e7fa772e1 🐛 Have a short caller (file:line) log 2022-12-30 08:30:48 +03:00
14 changed files with 165 additions and 145 deletions

View File

@@ -23,87 +23,61 @@
</a>
</p>
<p>
<p align="center">
Mizu (by UP9) is now Kubeshark, read more about it <a href="https://www.kubeshark.co/mizu-is-now-kubeshark">here</a>.
<b>
<a href="https://github.com/kubeshark/kubeshark/releases/tag/38.1">Version 38.1</a> is out with <a href="https://docs.kubeshark.co/en/pcap_export_import">PCAP export/import</a>, <a href="https://docs.kubeshark.co/en/tcp_streams">TCP streams</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
</b>
</p>
Kubeshark, the API Traffic Viewer for kubernetes, provides deep visibility and monitoring of all API traffic and payloads going in, out and across containers and pods inside a Kubernetes cluster.
Think of a combination of Chrome Dev Tools, TCPDump and Wireshark, re-invented for Kubernetes.
**Kubeshark** is an API Traffic Viewer for [**Kubernetes**](https://kubernetes.io/) providing deep visibility and monitoring of all API traffic and payloads going in, out and across containers and Pods inside a Kubernetes cluster.
![Simple UI](https://github.com/kubeshark/assets/raw/master/png/kubeshark-ui.png)
## Download
Think [TCPDump](https://en.wikipedia.org/wiki/Tcpdump) and [Wireshark](https://www.wireshark.org/) re-invented for Kubernetes
Kubeshark uses a ~45MB pre-compiled executable binary to communicate with the Kubernetes API. We recommend downloading the `kubeshark` CLI by using one of these options:
## Getting Started
- Choose the right binary, download and use directly from [the latest stable release](https://github.com/kubeshark/kubeshark/releases/latest).
- Use the shell script below :point_down: to automatically download the right binary for your operating system and CPU architecture:
```shell
sh <(curl -Ls https://kubeshark.co/install)
```
- Compile it from source using `make` command then use `./bin/kubeshark__` executable.
## Run
Use the `kubeshark` CLI to capture and view streaming API traffic in real time.
Download **Kubeshark**'s binary distribution [latest release](https://github.com/kubeshark/kubeshark/releases/latest) and run following one of these examples:
```shell
kubeshark tap
```
### Troubleshooting Installation
If something doesn't work or simply to play it safe prior to installing;
> Make sure you have access to https://hub.docker.com/
> Make sure `kubeshark` executable in your `PATH`.
### Select Pods
#### Monitoring a Specific Pod:
```shell
kubeshark tap catalogue-b87b45784-sxc8q
```
#### Monitoring a Set of Pods Using Regex:
```shell
kubeshark tap "(catalo*|front-end*)"
```
### Specify the Namespace
By default, Kubeshark targets the `default` namespace.
To specify a different namespace:
```
kubeshark tap -n sock-shop
```
### Specify All Namespaces
The default strategy of Kubeshark waits for the new pods
to be created. To simply tap all existing namespaces run:
```
kubeshark tap -A
```
```shell
kubeshark tap -n sock-shop "(catalo*|front-end*)"
```
Running any of the :point_up: above commands will open the [Web UI](https://docs.kubeshark.co/en/ui) in your browser which streams the traffic in your Kubernetes cluster in real-time.
### Homebrew
[Homebrew](https://brew.sh/) :beer: users can add Kubeshark formulae with:
```shell
brew tap kubeshark/kubeshark
```
and install Kubeshark CLI with:
```shell
brew install kubeshark
```
## Building From Source
Clone this repository and run `make` command to build it. After the build is complete, the executable can be found at `./bin/kubeshark__`.
## Documentation
Visit our documentation website: [docs.kubeshark.co](https://docs.kubeshark.co)
The documentation resources are open-source and can be found on GitHub: [kubeshark/docs](https://github.com/kubeshark/docs)
To learn more, read the [documentation](https://docs.kubeshark.co).
## Contributing
We ❤️ pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
We :heart: pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
## Code of Conduct

View File

@@ -46,6 +46,8 @@ func init() {
tapCmd.Flags().StringP(configStructs.DockerRegistryLabel, "r", defaultTapConfig.Docker.Registry, "The Docker registry that's hosting the images.")
tapCmd.Flags().StringP(configStructs.DockerTagLabel, "t", defaultTapConfig.Docker.Tag, "The tag of the Docker images that are going to be pulled.")
tapCmd.Flags().String(configStructs.DockerImagePullPolicy, defaultTapConfig.Docker.ImagePullPolicy, "ImagePullPolicy for the Docker images.")
tapCmd.Flags().StringSlice(configStructs.DockerImagePullSecrets, defaultTapConfig.Docker.ImagePullSecrets, "ImagePullSecrets for the Docker images.")
tapCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward.")
tapCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward.")
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")

View File

@@ -71,9 +71,9 @@ func tap() {
}
}
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:")
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:")
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
}
@@ -82,7 +82,7 @@ func tap() {
}
log.Info().Msg(fmt.Sprintf("Waiting for the creation of %s resources...", misc.Software))
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil {
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.ImagePullSecrets(), config.Config.Tap.Debug); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Warn().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance", misc.Software, misc.Program, misc.Software))
@@ -113,15 +113,15 @@ This function is a bit problematic as it might be detached from the actual pods
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
return err
} else {
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
for _, targettedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name)))
for _, targetedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
}
return nil
}
@@ -134,6 +134,7 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
SelfNamespace: config.Config.SelfNamespace,
WorkerResources: config.Config.Tap.Resources.Worker,
ImagePullPolicy: config.Config.ImagePullPolicy(),
ImagePullSecrets: config.Config.ImagePullSecrets(),
SelfServiceAccountExists: state.selfServiceAccountExists,
ServiceMesh: config.Config.Tap.ServiceMesh,
Tls: config.Config.Tap.Tls,
@@ -159,7 +160,7 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return
}
go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods)
go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
@@ -187,7 +188,7 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError)
return fmt.Sprintf("Failed to update currently targeted pods: %v", err.OriginalError)
case kubernetes.TapManagerPodWatchError:
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
case kubernetes.TapManagerWorkerUpdateError:

View File

@@ -45,6 +45,15 @@ func (config *ConfigStruct) ImagePullPolicy() v1.PullPolicy {
return v1.PullPolicy(config.Tap.Docker.ImagePullPolicy)
}
func (config *ConfigStruct) ImagePullSecrets() []v1.LocalObjectReference {
var ref []v1.LocalObjectReference
for _, name := range config.Tap.Docker.ImagePullSecrets {
ref = append(ref, v1.LocalObjectReference{Name: name})
}
return ref
}
func (config *ConfigStruct) IsNsRestrictedMode() bool {
return config.SelfNamespace != misc.Program // Notice "kubeshark" string must match the default SelfNamespace
}

View File

@@ -10,34 +10,36 @@ import (
)
const (
DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag"
ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces"
StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun"
PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh"
TlsLabel = "tls"
DebugLabel = "debug"
DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag"
DockerImagePullPolicy = "docker-imagepullpolicy"
DockerImagePullSecrets = "docker-imagepullsecrets"
ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces"
StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun"
PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh"
TlsLabel = "tls"
DebugLabel = "debug"
)
type WorkerConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8897"`
DstPort uint16 `yaml:"dst-port" default:"8897"`
SrcPort uint16 `yaml:"port" default:"8897"`
DstPort uint16 `yaml:"srvport" default:"8897"`
}
type HubConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8898"`
DstPort uint16 `yaml:"dst-port" default:"8898"`
SrcPort uint16 `yaml:"port" default:"8898"`
DstPort uint16 `yaml:"srvport" default:"8898"`
}
type FrontConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8899"`
DstPort uint16 `yaml:"dst-port" default:"80"`
SrcPort uint16 `yaml:"port" default:"8899"`
DstPort uint16 `yaml:"srvport" default:"80"`
}
type ProxyConfig struct {
@@ -48,9 +50,10 @@ type ProxyConfig struct {
}
type DockerConfig struct {
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
Tag string `yaml:"tag" default:"latest"`
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
Tag string `yaml:"tag" default:"latest"`
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
ImagePullSecrets []string `yaml:"imagepullsecrets"`
}
type ResourcesConfig struct {

View File

@@ -18,7 +18,7 @@ func FormatError(err error) error {
if k8serrors.IsForbidden(err) {
errorNew = fmt.Errorf("insufficient permissions: %w. "+
"supply the required permission or control %s's access to namespaces by setting %s "+
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>",
"in the config file or setting the targeted namespace with --%s %s=<NAMEPSACE>",
err,
misc.Software,
config.SelfNamespaceConfigName,

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
github.com/google/go-github/v37 v37.0.0
github.com/kubeshark/base v0.5.0
github.com/kubeshark/base v0.5.1
github.com/rs/zerolog v1.28.0
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5

4
go.sum
View File

@@ -414,8 +414,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0=
github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/kubeshark/base v0.5.1 h1:msy1iQLgWQK1COoicwWxEDbeXU9J5RuptA5fYeOEzfA=
github.com/kubeshark/base v0.5.1/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=

View File

@@ -116,22 +116,22 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
}
}
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) {
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url)
func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) {
postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url)
if podsMarshalled, err := json.Marshal(pods); err != nil {
log.Error().Err(err).Msg("Failed to marshal the targetted pods:")
log.Error().Err(err).Msg("Failed to marshal the targeted pods:")
} else {
ok := false
for !ok {
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:")
log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:")
} else {
ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:")
log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:")
}
time.Sleep(time.Second)
}

View File

@@ -177,6 +177,7 @@ type PodOptions struct {
ServiceAccountName string
Resources Resources
ImagePullPolicy core.PullPolicy
ImagePullSecrets []core.LocalObjectReference
Debug bool
}
@@ -251,6 +252,7 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
Effect: core.TaintEffectNoSchedule,
},
},
ImagePullSecrets: opts.ImagePullSecrets,
},
}
@@ -353,6 +355,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
Effect: core.TaintEffectNoSchedule,
},
},
ImagePullSecrets: opts.ImagePullSecrets,
},
}
@@ -664,6 +667,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
serviceAccountName string,
resources Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
@@ -812,6 +816,14 @@ func (provider *Provider) ApplyWorkerDaemonSet(
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
podSpec.WithVolumes(procfsVolume, sysfsVolume)
if len(imagePullSecrets) > 0 {
localObjectReference := applyconfcore.LocalObjectReference()
for _, secret := range imagePullSecrets {
localObjectReference.WithName(secret.Name)
}
podSpec.WithImagePullSecrets(localObjectReference)
}
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": workerPodName,

View File

@@ -8,19 +8,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap {
nodeToTargettedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targettedPods {
func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap {
nodeToTargetedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targetedPods {
minimizedPod := getMinimizedPod(pod)
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName]
existingList := nodeToTargetedPodsMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
} else {
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod)
nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod)
}
}
return nodeToTargettedPodsMap
return nodeToTargetedPodsMap
}
func getMinimizedPod(fullPod core.Pod) core.Pod {

View File

@@ -17,23 +17,23 @@ import (
const updateWorkersDelay = 5 * time.Second
type TargettedPodChangeEvent struct {
type TargetedPodChangeEvent struct {
Added []v1.Pod
Removed []v1.Pod
}
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct {
startTime time.Time
context context.Context
CurrentlyTargettedPods []v1.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TargettedPodChangeEvent
WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sTapManagerError
nodeToTargettedPodMap models.NodeToPodsMap
targettedNodes []string
startTime time.Time
context context.Context
CurrentlyTargetedPods []v1.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TargetedPodChangeEvent
WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sTapManagerError
nodeToTargetedPodMap models.NodeToPodsMap
targetedNodes []string
}
type WorkerSyncerConfig struct {
@@ -42,6 +42,7 @@ type WorkerSyncerConfig struct {
SelfNamespace string
WorkerResources Resources
ImagePullPolicy v1.PullPolicy
ImagePullSecrets []v1.LocalObjectReference
SelfServiceAccountExists bool
ServiceMesh bool
Tls bool
@@ -50,17 +51,17 @@ type WorkerSyncerConfig struct {
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTargettedPods: make([]v1.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargettedPodChangeEvent, 100),
WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTargetedPods: make([]v1.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil {
if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil {
return nil, err
}
@@ -68,7 +69,7 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide
return nil, err
}
go syncer.watchPodsForTargetting()
go syncer.watchPodsForTargeting()
go syncer.watchWorkerEvents()
go syncer.watchWorkerPods()
return syncer, nil
@@ -178,12 +179,12 @@ func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
}
}
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() {
func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() {
err, changeFound := workerSyncer.updateCurrentlyTargettedPods()
err, changeFound := workerSyncer.updateCurrentlyTargetedPods()
if err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
@@ -298,22 +299,22 @@ func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorke
}
}
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) {
func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget)
for _, addedPod := range addedPods {
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:")
log.Info().Str("pod", addedPod.Name).Msg("Currently targeting:")
}
for _, removedPod := range removedPods {
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.")
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targeting is stopped.")
}
if len(addedPods) > 0 || len(removedPods) > 0 {
workerSyncer.CurrentlyTargettedPods = podsToTarget
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods)
workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{
workerSyncer.CurrentlyTargetedPods = podsToTarget
workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods)
workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
@@ -324,14 +325,14 @@ func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, cha
}
func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap))
nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap))
i := 0
for node := range workerSyncer.nodeToTargettedPodMap {
for node := range workerSyncer.nodeToTargetedPodMap {
nodesToTarget[i] = node
i++
}
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) {
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
@@ -340,7 +341,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
image := docker.GetWorkerImage()
if len(workerSyncer.nodeToTargettedPodMap) > 0 {
if len(workerSyncer.nodeToTargetedPodMap) > 0 {
var serviceAccountName string
if workerSyncer.config.SelfServiceAccountExists {
serviceAccountName = ServiceAccountName
@@ -348,8 +349,8 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
serviceAccountName = ""
}
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap))
for nodeName := range workerSyncer.nodeToTargettedPodMap {
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap))
for nodeName := range workerSyncer.nodeToTargetedPodMap {
nodeNames = append(nodeNames, nodeName)
}
@@ -363,13 +364,14 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
serviceAccountName,
workerSyncer.config.WorkerResources,
workerSyncer.config.ImagePullPolicy,
workerSyncer.config.ImagePullSecrets,
workerSyncer.config.ServiceMesh,
workerSyncer.config.Tls,
workerSyncer.config.Debug); err != nil {
return err
}
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.")
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.")
} else {
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context,
@@ -383,7 +385,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
log.Debug().Msg("Successfully resetted Worker DaemonSet")
}
workerSyncer.targettedNodes = nodesToTarget
workerSyncer.targetedNodes = nodesToTarget
return nil
}

View File

@@ -2,6 +2,7 @@ package main
import (
"os"
"strconv"
"time"
"github.com/kubeshark/kubeshark/cmd"
@@ -11,6 +12,20 @@ import (
func main() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
// Short caller (file:line)
zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string {
short := file
for i := len(file) - 1; i > 0; i-- {
if file[i] == '/' {
short = file[i+1:]
break
}
}
file = short
return file + ":" + strconv.Itoa(line)
}
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Caller().Logger()
cmd.Execute()
}

View File

@@ -13,7 +13,7 @@ import (
core "k8s.io/api/core/v1"
)
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) {
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, imagePullSecrets []core.LocalObjectReference, debug bool) (bool, error) {
if !isNsRestrictedMode {
if err := createSelfNamespace(ctx, kubernetesProvider, selfNamespace); err != nil {
return false, err
@@ -39,6 +39,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
ServiceAccountName: serviceAccountName,
Resources: hubResources,
ImagePullPolicy: imagePullPolicy,
ImagePullSecrets: imagePullSecrets,
Debug: debug,
}
@@ -49,6 +50,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
ServiceAccountName: serviceAccountName,
Resources: hubResources,
ImagePullPolicy: imagePullPolicy,
ImagePullSecrets: imagePullSecrets,
Debug: debug,
}