Warn pods not starting (#493)

Print warning event related to mizu k8s resources.
In non-daemon print to CLI. In Daemon print to API-Server logs.
This commit is contained in:
Nimrod Gilboa Markevich 2021-11-22 15:30:10 +02:00 committed by GitHub
parent ed7b754eca
commit b1ad2efb96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 285 additions and 17 deletions

View File

@ -22,6 +22,7 @@ import (
"path"
"path/filepath"
"plugin"
"regexp"
"sort"
"syscall"
"time"
@ -264,9 +265,16 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if _, err := startMizuTapperSyncer(ctx); err != nil {
kubernetesProvider, err := kubernetes.NewProviderInCluster()
if err != nil {
logger.Log.Fatalf("error creating k8s provider: %+v", err)
}
if _, err := startMizuTapperSyncer(ctx, kubernetesProvider); err != nil {
logger.Log.Fatalf("error initializing tapper syncer: %+v", err)
}
go watchMizuEvents(ctx, kubernetesProvider, cancel)
}
utils.StartServer(app)
@ -426,12 +434,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time.
return nil, lastErr
}
func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, error) {
provider, err := kubernetes.NewProviderInCluster()
if err != nil {
return nil, err
}
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) {
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
TargetNamespaces: config.Config.TargetNamespaces,
PodFilterRegex: config.Config.TapTargetRegex.Regexp,
@ -483,3 +486,86 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
return tapperSyncer, nil
}
func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
// Round down because k8s CreationTimestamp is given in 1 sec resolution.
startTime := time.Now().Truncate(time.Second)
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("error in watch mizu resource events loop: %+v", err)
cancel()
case <-ctx.Done():
logger.Log.Debugf("watching Mizu resource events loop, ctx done")
return
}
}
}

View File

@ -42,6 +42,8 @@ var state tapState
var apiProvider *apiserver.Provider
func RunMizuTap() {
startTime := time.Now()
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err != nil {
@ -150,6 +152,7 @@ func RunMizuTap() {
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchMizuEvents, ctx, kubernetesProvider, cancel, startTime)
// block until exit signal or error
waitForFinish(ctx, cancel)
@ -727,7 +730,7 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
continue
}
logger.Log.Errorf("[Error] Error in mizu tapper watch, err: %v", err)
logger.Log.Errorf("[Error] Error in mizu tapper pod watch, err: %v", err)
cancel()
case <-ctx.Done():
@ -737,6 +740,89 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
}
func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, startTime time.Time) {
// Round down because k8s CreationTimestamp is given in 1 sec resolution.
startTime = startTime.Truncate(time.Second)
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("error in watch mizu resource events loop: %+v", err)
cancel()
case <-ctx.Done():
logger.Log.Debugf("watching Mizu resource events loop, ctx done")
return
}
}
}
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
if config.Config.Tap.AllNamespaces {
return []string{kubernetes.K8sAllNamespaces}

View File

@ -49,6 +49,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -23,6 +23,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -46,6 +46,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -41,6 +41,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -38,6 +38,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -20,6 +20,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -38,6 +38,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@ -0,0 +1,45 @@
package kubernetes
import (
"context"
"regexp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type EventWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
}
func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *EventWatchHelper {
return &EventWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
}
}
// Implements the EventFilterer Interface
func (wh *EventWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
event, err := wEvent.ToEvent()
if err != nil {
return false, nil
}
if !wh.NameRegexFilter.MatchString(event.Name) {
return false, nil
}
return true, nil
}
// Implements the WatchCreator Interface
func (wh *EventWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := wh.kubernetesProvider.clientSet.EventsV1().Events(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}
return watcher, nil
}

View File

@ -21,13 +21,13 @@ func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Reg
}
// Implements the EventFilterer Interface
func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
func (wh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
pod, err := wEvent.ToPod()
if err != nil {
return false, nil
}
if !pwh.NameRegexFilter.MatchString(pod.Name) {
if !wh.NameRegexFilter.MatchString(pod.Name) {
return false, nil
}
@ -35,8 +35,8 @@ func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
}
// Implements the WatchCreator Interface
func (pwh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := pwh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
func (wh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := wh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}

View File

@ -485,6 +485,11 @@ func (provider *Provider) CreateDaemonsetRBAC(ctx context.Context, namespace str
Resources: []string{"daemonsets"},
Verbs: []string{"patch", "get", "list", "create", "delete"},
},
{
APIGroups: []string{"events.k8s.io"},
Resources: []string{"events"},
Verbs: []string{"list", "watch"},
},
},
}
roleBinding := &rbac.RoleBinding{

View File

@ -9,7 +9,6 @@ import (
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
@ -39,7 +38,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
for {
watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace)
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
errorChan <- fmt.Errorf("error in k8s watch: %v", err)
break
}
@ -54,7 +53,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
}
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
errorChan <- fmt.Errorf("error in k8s watch: %v", err)
break
} else {
if !watchRestartDebouncer.IsOn() {
@ -95,7 +94,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event
wEvent := WatchEvent(e)
if wEvent.Type == watch.Error {
return apierrors.FromObject(wEvent.Object)
return wEvent.ToError()
}
if pass, err := filterer.Filter(&wEvent); err != nil {

View File

@ -2,17 +2,43 @@ package kubernetes
import (
"fmt"
"reflect"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
type InvalidObjectType struct {
RequestedType reflect.Type
}
// Implements the error interface
func (iot *InvalidObjectType) Error() string {
return fmt.Sprintf("Cannot convert event to type %s", iot.RequestedType)
}
type WatchEvent watch.Event
func (we *WatchEvent) ToPod() (*corev1.Pod, error) {
pod, ok := we.Object.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("Invalid object type on pod event stream")
return nil, &InvalidObjectType{RequestedType: reflect.TypeOf(pod)}
}
return pod, nil
}
func (we *WatchEvent) ToEvent() (*eventsv1.Event, error) {
event, ok := we.Object.(*eventsv1.Event)
if !ok {
return nil, &InvalidObjectType{RequestedType: reflect.TypeOf(event)}
}
return event, nil
}
func (we *WatchEvent) ToError() error {
return apierrors.FromObject(we.Object)
}