mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-21 10:06:58 +00:00
Update tap targets over ws (#901)
Update tappers via websocket instead of by env var. This way the DaemonSet doesn't have to be applied just to notify the tappers that the tap targets changed. The number of tapper restarts is reduced. The DaemonSet still gets applied when there is a need to add/remove a tapper from a node.
This commit is contained in:
committed by
GitHub
parent
41a7587088
commit
a5c35d7d90
@@ -31,7 +31,8 @@ type MizuTapperSyncer struct {
|
||||
TapPodChangesOut chan TappedPodChangeEvent
|
||||
TapperStatusChangedOut chan shared.TapperStatus
|
||||
ErrorOut chan K8sTapManagerError
|
||||
nodeToTappedPodMap map[string][]core.Pod
|
||||
nodeToTappedPodMap shared.NodeToPodsMap
|
||||
tappedNodes []string
|
||||
}
|
||||
|
||||
type TapperSyncerConfig struct {
|
||||
@@ -177,7 +178,7 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
|
||||
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
|
||||
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
|
||||
|
||||
restartTappers := func() {
|
||||
handleChangeInPods := func() {
|
||||
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
|
||||
if err != nil {
|
||||
tapperSyncer.ErrorOut <- K8sTapManagerError{
|
||||
@@ -197,7 +198,7 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
|
||||
}
|
||||
}
|
||||
}
|
||||
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
|
||||
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, handleChangeInPods)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -295,6 +296,20 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
|
||||
}
|
||||
|
||||
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
nodesToTap := make([]string, len(tapperSyncer.nodeToTappedPodMap))
|
||||
i := 0
|
||||
for node := range tapperSyncer.nodeToTappedPodMap {
|
||||
nodesToTap[i] = node
|
||||
i++
|
||||
}
|
||||
|
||||
if shared.EqualStringSlices(nodesToTap, tapperSyncer.tappedNodes) {
|
||||
logger.Log.Debug("Skipping apply, DaemonSet is up to date")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Updating DaemonSet to run on nodes: %v", nodesToTap)
|
||||
|
||||
if len(tapperSyncer.nodeToTappedPodMap) > 0 {
|
||||
var serviceAccountName string
|
||||
if tapperSyncer.config.MizuServiceAccountExists {
|
||||
@@ -303,6 +318,11 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
serviceAccountName = ""
|
||||
}
|
||||
|
||||
nodeNames := make([]string, 0, len(tapperSyncer.nodeToTappedPodMap))
|
||||
for nodeName := range tapperSyncer.nodeToTappedPodMap {
|
||||
nodeNames = append(nodeNames, nodeName)
|
||||
}
|
||||
|
||||
if err := tapperSyncer.kubernetesProvider.ApplyMizuTapperDaemonSet(
|
||||
tapperSyncer.context,
|
||||
tapperSyncer.config.MizuResourcesNamespace,
|
||||
@@ -310,7 +330,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.AgentImage,
|
||||
TapperPodName,
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
|
||||
tapperSyncer.nodeToTappedPodMap,
|
||||
nodeNames,
|
||||
serviceAccountName,
|
||||
tapperSyncer.config.TapperResources,
|
||||
tapperSyncer.config.ImagePullPolicy,
|
||||
@@ -335,5 +355,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
logger.Log.Debugf("Successfully reset tapper daemon set")
|
||||
}
|
||||
|
||||
tapperSyncer.tappedNodes = nodesToTap
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user