From b1ad2efb96d7a2f21fe5469521853f05246dcf7f Mon Sep 17 00:00:00 2001 From: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> Date: Mon, 22 Nov 2021 15:30:10 +0200 Subject: [PATCH] Warn pods not starting (#493) Print warning event related to mizu k8s resources. In non-daemon print to CLI. In Daemon print to API-Server logs. --- agent/main.go | 100 ++++++++++++++++-- cli/cmd/tapRunner.go | 88 ++++++++++++++- .../permissions-all-namespaces-daemon.yaml | 3 + ...-all-namespaces-without-ip-resolution.yaml | 3 + .../roles/permissions-all-namespaces.yaml | 3 + examples/roles/permissions-ns-daemon.yaml | 3 + .../roles/permissions-ns-with-validation.yaml | 3 + .../permissions-ns-without-ip-resolution.yaml | 3 + examples/roles/permissions-ns.yaml | 3 + shared/kubernetes/eventWatchHelper.go | 45 ++++++++ shared/kubernetes/podWatchHelper.go | 8 +- shared/kubernetes/provider.go | 5 + shared/kubernetes/watch.go | 7 +- shared/kubernetes/watchEvent.go | 28 ++++- 14 files changed, 285 insertions(+), 17 deletions(-) create mode 100644 shared/kubernetes/eventWatchHelper.go diff --git a/agent/main.go b/agent/main.go index 5866f670e..c3a87fe69 100644 --- a/agent/main.go +++ b/agent/main.go @@ -22,6 +22,7 @@ import ( "path" "path/filepath" "plugin" + "regexp" "sort" "syscall" "time" @@ -264,9 +265,16 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if _, err := startMizuTapperSyncer(ctx); err != nil { + kubernetesProvider, err := kubernetes.NewProviderInCluster() + if err != nil { + logger.Log.Fatalf("error creating k8s provider: %+v", err) + } + + if _, err := startMizuTapperSyncer(ctx, kubernetesProvider); err != nil { logger.Log.Fatalf("error initializing tapper syncer: %+v", err) } + + go watchMizuEvents(ctx, kubernetesProvider, cancel) } utils.StartServer(app) @@ -426,12 +434,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time. return nil, lastErr } -func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, error) { - provider, err := kubernetes.NewProviderInCluster() - if err != nil { - return nil, err - } - +func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) { tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ TargetNamespaces: config.Config.TargetNamespaces, PodFilterRegex: config.Config.TapTargetRegex.Regexp, @@ -483,3 +486,86 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e return tapperSyncer, nil } + +func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + // Round down because k8s CreationTimestamp is given in 1 sec resolution. + startTime := time.Now().Truncate(time.Second) + + mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) + eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) + + for { + select { + case wEvent, ok := <-added: + if !ok { + added = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf("error parsing Mizu resource added event: %+v", err) + cancel() + } + + if startTime.After(event.CreationTimestamp.Time) { + continue + } + + if event.Type == v1.EventTypeWarning { + logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) + } + case wEvent, ok := <-removed: + if !ok { + removed = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err) + cancel() + } + + if startTime.After(event.CreationTimestamp.Time) { + continue + } + + if event.Type == v1.EventTypeWarning { + logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) + } + case wEvent, ok := <-modified: + if !ok { + modified = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err) + cancel() + } + + if startTime.After(event.CreationTimestamp.Time) { + continue + } + + if event.Type == v1.EventTypeWarning { + logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) + } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + + logger.Log.Errorf("error in watch mizu resource events loop: %+v", err) + cancel() + + case <-ctx.Done(): + logger.Log.Debugf("watching Mizu resource events loop, ctx done") + return + } + } +} diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 589112168..17b5d76d3 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -42,6 +42,8 @@ var state tapState var apiProvider *apiserver.Provider func RunMizuTap() { + startTime := time.Now() + mizuApiFilteringOptions, err := getMizuApiFilteringOptions() apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout) if err != nil { @@ -150,6 +152,7 @@ func RunMizuTap() { go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel) + go goUtils.HandleExcWrapper(watchMizuEvents, ctx, kubernetesProvider, cancel, startTime) // block until exit signal or error waitForFinish(ctx, cancel) @@ -727,7 +730,7 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider continue } - logger.Log.Errorf("[Error] Error in mizu tapper watch, err: %v", err) + logger.Log.Errorf("[Error] Error in mizu tapper pod watch, err: %v", err) cancel() case <-ctx.Done(): @@ -737,6 +740,89 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider } } +func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, startTime time.Time) { + // Round down because k8s CreationTimestamp is given in 1 sec resolution. + startTime = startTime.Truncate(time.Second) + + mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) + eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) + + for { + select { + case wEvent, ok := <-added: + if !ok { + added = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err)) + cancel() + } + + if startTime.After(event.CreationTimestamp.Time) { + continue + } + + if event.Type == core.EventTypeWarning { + logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)) + } + case wEvent, ok := <-removed: + if !ok { + removed = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err)) + cancel() + } + + if startTime.After(event.CreationTimestamp.Time) { + continue + } + + if event.Type == core.EventTypeWarning { + logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)) + } + case wEvent, ok := <-modified: + if !ok { + modified = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err)) + cancel() + } + + if startTime.After(event.CreationTimestamp.Time) { + continue + } + + if event.Type == core.EventTypeWarning { + logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)) + } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + + logger.Log.Errorf("error in watch mizu resource events loop: %+v", err) + cancel() + + case <-ctx.Done(): + logger.Log.Debugf("watching Mizu resource events loop, ctx done") + return + } + } +} + func getNamespaces(kubernetesProvider *kubernetes.Provider) []string { if config.Config.Tap.AllNamespaces { return []string{kubernetes.K8sAllNamespaces} diff --git a/examples/roles/permissions-all-namespaces-daemon.yaml b/examples/roles/permissions-all-namespaces-daemon.yaml index 3cbc16a13..99d36110e 100644 --- a/examples/roles/permissions-all-namespaces-daemon.yaml +++ b/examples/roles/permissions-all-namespaces-daemon.yaml @@ -49,6 +49,9 @@ rules: - apiGroups: ["", "apps", "extensions"] resources: ["endpoints"] verbs: ["get", "list", "watch"] + - apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml b/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml index bced278e9..0a743e776 100644 --- a/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml +++ b/examples/roles/permissions-all-namespaces-without-ip-resolution.yaml @@ -23,6 +23,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete"] + - apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-all-namespaces.yaml b/examples/roles/permissions-all-namespaces.yaml index 097d276e0..7b8b065ee 100644 --- a/examples/roles/permissions-all-namespaces.yaml +++ b/examples/roles/permissions-all-namespaces.yaml @@ -46,6 +46,9 @@ rules: - apiGroups: ["", "apps", "extensions"] resources: ["endpoints"] verbs: ["get", "list", "watch"] +- apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-ns-daemon.yaml b/examples/roles/permissions-ns-daemon.yaml index 589470678..0ab880f11 100644 --- a/examples/roles/permissions-ns-daemon.yaml +++ b/examples/roles/permissions-ns-daemon.yaml @@ -41,6 +41,9 @@ rules: - apiGroups: ["", "apps", "extensions"] resources: ["endpoints"] verbs: ["get", "list", "watch"] +- apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-ns-with-validation.yaml b/examples/roles/permissions-ns-with-validation.yaml index e2d6863ec..a3e3eceb2 100644 --- a/examples/roles/permissions-ns-with-validation.yaml +++ b/examples/roles/permissions-ns-with-validation.yaml @@ -38,6 +38,9 @@ rules: - apiGroups: ["", "apps", "extensions"] resources: ["endpoints"] verbs: ["get", "list", "watch"] +- apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-ns-without-ip-resolution.yaml b/examples/roles/permissions-ns-without-ip-resolution.yaml index ef14933a1..24bc0d822 100644 --- a/examples/roles/permissions-ns-without-ip-resolution.yaml +++ b/examples/roles/permissions-ns-without-ip-resolution.yaml @@ -20,6 +20,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete"] +- apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/examples/roles/permissions-ns.yaml b/examples/roles/permissions-ns.yaml index 3af89afa0..6974ab50f 100644 --- a/examples/roles/permissions-ns.yaml +++ b/examples/roles/permissions-ns.yaml @@ -38,6 +38,9 @@ rules: - apiGroups: ["", "apps", "extensions"] resources: ["endpoints"] verbs: ["get", "list", "watch"] +- apiGroups: ["events.k8s.io"] + resources: ["events"] + verbs: ["list", "watch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/shared/kubernetes/eventWatchHelper.go b/shared/kubernetes/eventWatchHelper.go new file mode 100644 index 000000000..3ec3cb956 --- /dev/null +++ b/shared/kubernetes/eventWatchHelper.go @@ -0,0 +1,45 @@ +package kubernetes + +import ( + "context" + "regexp" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +type EventWatchHelper struct { + kubernetesProvider *Provider + NameRegexFilter *regexp.Regexp +} + +func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *EventWatchHelper { + return &EventWatchHelper{ + kubernetesProvider: kubernetesProvider, + NameRegexFilter: NameRegexFilter, + } +} + +// Implements the EventFilterer Interface +func (wh *EventWatchHelper) Filter(wEvent *WatchEvent) (bool, error) { + event, err := wEvent.ToEvent() + if err != nil { + return false, nil + } + + if !wh.NameRegexFilter.MatchString(event.Name) { + return false, nil + } + + return true, nil +} + +// Implements the WatchCreator Interface +func (wh *EventWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) { + watcher, err := wh.kubernetesProvider.clientSet.EventsV1().Events(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) + if err != nil { + return nil, err + } + + return watcher, nil +} diff --git a/shared/kubernetes/podWatchHelper.go b/shared/kubernetes/podWatchHelper.go index 2184eeb43..771964466 100644 --- a/shared/kubernetes/podWatchHelper.go +++ b/shared/kubernetes/podWatchHelper.go @@ -21,13 +21,13 @@ func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Reg } // Implements the EventFilterer Interface -func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) { +func (wh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) { pod, err := wEvent.ToPod() if err != nil { return false, nil } - if !pwh.NameRegexFilter.MatchString(pod.Name) { + if !wh.NameRegexFilter.MatchString(pod.Name) { return false, nil } @@ -35,8 +35,8 @@ func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) { } // Implements the WatchCreator Interface -func (pwh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) { - watcher, err := pwh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) +func (wh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) { + watcher, err := wh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) if err != nil { return nil, err } diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index 649444a49..0c024ab47 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -485,6 +485,11 @@ func (provider *Provider) CreateDaemonsetRBAC(ctx context.Context, namespace str Resources: []string{"daemonsets"}, Verbs: []string{"patch", "get", "list", "create", "delete"}, }, + { + APIGroups: []string{"events.k8s.io"}, + Resources: []string{"events"}, + Verbs: []string{"list", "watch"}, + }, }, } roleBinding := &rbac.RoleBinding{ diff --git a/shared/kubernetes/watch.go b/shared/kubernetes/watch.go index 7e28e152a..a9588aa19 100644 --- a/shared/kubernetes/watch.go +++ b/shared/kubernetes/watch.go @@ -9,7 +9,6 @@ import ( "sync" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" ) @@ -39,7 +38,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames for { watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace) if err != nil { - errorChan <- fmt.Errorf("error in k8 watch: %v", err) + errorChan <- fmt.Errorf("error in k8s watch: %v", err) break } @@ -54,7 +53,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames } if err != nil { - errorChan <- fmt.Errorf("error in k8 watch: %v", err) + errorChan <- fmt.Errorf("error in k8s watch: %v", err) break } else { if !watchRestartDebouncer.IsOn() { @@ -95,7 +94,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event wEvent := WatchEvent(e) if wEvent.Type == watch.Error { - return apierrors.FromObject(wEvent.Object) + return wEvent.ToError() } if pass, err := filterer.Filter(&wEvent); err != nil { diff --git a/shared/kubernetes/watchEvent.go b/shared/kubernetes/watchEvent.go index fc1b7e11f..ed07e4cf1 100644 --- a/shared/kubernetes/watchEvent.go +++ b/shared/kubernetes/watchEvent.go @@ -2,17 +2,43 @@ package kubernetes import ( "fmt" + "reflect" + corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" ) +type InvalidObjectType struct { + RequestedType reflect.Type +} + +// Implements the error interface +func (iot *InvalidObjectType) Error() string { + return fmt.Sprintf("Cannot convert event to type %s", iot.RequestedType) +} + type WatchEvent watch.Event func (we *WatchEvent) ToPod() (*corev1.Pod, error) { pod, ok := we.Object.(*corev1.Pod) if !ok { - return nil, fmt.Errorf("Invalid object type on pod event stream") + return nil, &InvalidObjectType{RequestedType: reflect.TypeOf(pod)} } return pod, nil } + +func (we *WatchEvent) ToEvent() (*eventsv1.Event, error) { + event, ok := we.Object.(*eventsv1.Event) + if !ok { + return nil, &InvalidObjectType{RequestedType: reflect.TypeOf(event)} + } + + return event, nil +} + +func (we *WatchEvent) ToError() error { + return apierrors.FromObject(we.Object) +}