Use github.com/rs/zerolog for logging (#1255)

* Use `github.com/rs/zerolog` for logging

* Use `github.com/rs/zerolog` for logging (continue)

* Add `debug` flag

* Remove `github.com/op/go-logging` dependency completely

* Fix linter
This commit is contained in:
M. Mert Yildiran
2022-11-28 16:48:20 -08:00
committed by GitHub
parent 10a9b5a3d7
commit 86fd616b84
36 changed files with 511 additions and 318 deletions

View File

@@ -3,7 +3,6 @@ package kubernetes
import (
"context"
"fmt"
"log"
"regexp"
"time"
@@ -11,7 +10,8 @@ import (
"github.com/kubeshark/kubeshark/utils"
"github.com/kubeshark/worker/api"
"github.com/kubeshark/worker/models"
"github.com/op/go-logging"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
)
@@ -42,7 +42,7 @@ type TapperSyncerConfig struct {
KubesharkResourcesNamespace string
TapperResources models.Resources
ImagePullPolicy core.PullPolicy
LogLevel logging.Level
LogLevel zerolog.Level
KubesharkApiFilteringOptions api.TrafficFilteringOptions
KubesharkServiceAccountExists bool
ServiceMesh bool
@@ -91,11 +91,15 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() {
pod, err := wEvent.ToPod()
if err != nil {
log.Printf("[ERROR] parsing Kubeshark resource pod: %+v", err)
log.Error().Str("pod", TapperPodName).Err(err).Msg("While parsing Kubeshark resource!")
continue
}
log.Printf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
log.Debug().
Str("pod", pod.Name).
Str("node", pod.Spec.NodeName).
Interface("phase", pod.Status.Phase).
Msg("Watching pod events...")
if pod.Spec.NodeName != "" {
tapperStatus := models.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
tapperSyncer.TapperStatusChangedOut <- tapperStatus
@@ -106,10 +110,12 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() {
errorChan = nil
continue
}
log.Printf("[ERROR] Watching tapper pods loop, error: %+v", err)
log.Error().Str("pod", TapperPodName).Err(err).Msg("While watching pod!")
case <-tapperSyncer.context.Done():
log.Printf("Watching tapper pods loop, ctx done")
log.Debug().
Str("pod", TapperPodName).
Msg("Watching pod, context done.")
return
}
}
@@ -130,23 +136,26 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
event, err := wEvent.ToEvent()
if err != nil {
log.Printf("[ERROR] parsing Kubeshark resource event: %+v", err)
log.Error().
Str("pod", TapperPodName).
Err(err).
Msg("Parsing resource event.")
continue
}
log.Printf(
"Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
event.Name,
event.CreationTimestamp.Time,
event.Regarding.Name,
event.Regarding.Kind,
event.Reason,
event.Note,
)
log.Debug().
Str("pod", TapperPodName).
Str("event", event.Name).
Time("time", event.CreationTimestamp.Time).
Str("name", event.Regarding.Name).
Str("kind", event.Regarding.Kind).
Str("reason", event.Reason).
Str("note", event.Note).
Msg("Watching events.")
pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.KubesharkResourcesNamespace, event.Regarding.Name)
if err1 != nil {
log.Printf("Couldn't get tapper pod %s", event.Regarding.Name)
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
continue
}
@@ -166,10 +175,15 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
continue
}
log.Printf("[ERROR] Watching tapper events loop, error: %+v", err)
log.Error().
Str("pod", TapperPodName).
Err(err).
Msg("While watching events.")
case <-tapperSyncer.context.Done():
log.Printf("Watching tapper events loop, ctx done")
log.Debug().
Str("pod", TapperPodName).
Msg("Watching pod events, context done.")
return
}
}
@@ -189,7 +203,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
}
if !changeFound {
log.Printf("Nothing changed update tappers not needed")
log.Debug().Msg("Nothing changed. Updating tappers is not needed.")
return
}
if err := tapperSyncer.updateKubesharkTappers(); err != nil {
@@ -217,17 +231,37 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
switch wEvent.Type {
case EventAdded:
log.Printf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Added matching pod.")
if err := restartTappersDebouncer.SetOn(); err != nil {
log.Print(err)
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting tappers!")
}
case EventDeleted:
log.Printf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Removed matching pod.")
if err := restartTappersDebouncer.SetOn(); err != nil {
log.Print(err)
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting tappers!")
}
case EventModified:
log.Printf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Str("ip", pod.Status.PodIP).
Interface("phase", pod.Status.Phase).
Msg("Modified matching pod.")
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
@@ -236,7 +270,11 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
if err := restartTappersDebouncer.SetOn(); err != nil {
log.Print(err)
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting tappers!")
}
}
case EventBookmark:
@@ -254,7 +292,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
continue
case <-tapperSyncer.context.Done():
log.Printf("Watching pods loop, context done, stopping `restart tappers debouncer`")
log.Debug().Msg("Watching pods, context done. Stopping \"restart tappers debouncer\"")
restartTappersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
@@ -263,7 +301,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
}
func (tapperSyncer *KubesharkTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
log.Printf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart tappers debouncer\"")
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
@@ -278,10 +316,10 @@ func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err erro
podsToTap := excludeKubesharkPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(tapperSyncer.CurrentlyTappedPods, podsToTap)
for _, addedPod := range addedPods {
log.Printf("tapping new pod %s", addedPod.Name)
log.Info().Str("pod", addedPod.Name).Msg("Tapping new pod.")
}
for _, removedPod := range removedPods {
log.Printf("pod %s is no longer running, tapping for it stopped", removedPod.Name)
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Tapping is stopped.")
}
if len(addedPods) > 0 || len(removedPods) > 0 {
tapperSyncer.CurrentlyTappedPods = podsToTap
@@ -305,11 +343,11 @@ func (tapperSyncer *KubesharkTapperSyncer) updateKubesharkTappers() error {
}
if utils.EqualStringSlices(nodesToTap, tapperSyncer.tappedNodes) {
log.Print("Skipping apply, DaemonSet is up to date")
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
log.Printf("Updating DaemonSet to run on nodes: %v", nodesToTap)
log.Debug().Strs("nodes", nodesToTap).Msg("Updating DaemonSet to run on nodes.")
image := "kubeshark/worker:latest"
@@ -345,7 +383,7 @@ func (tapperSyncer *KubesharkTapperSyncer) updateKubesharkTappers() error {
return err
}
log.Printf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
log.Debug().Int("tapper-count", len(tapperSyncer.nodeToTappedPodMap)).Msg("Successfully created tappers.")
} else {
if err := tapperSyncer.kubernetesProvider.ResetKubesharkTapperDaemonSet(
tapperSyncer.context,
@@ -356,7 +394,7 @@ func (tapperSyncer *KubesharkTapperSyncer) updateKubesharkTappers() error {
return err
}
log.Printf("Successfully reset tapper daemon set")
log.Debug().Msg("Successfully reset tapper daemon set")
}
tapperSyncer.tappedNodes = nodesToTap

View File

@@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"log"
"net/url"
"path/filepath"
"regexp"
@@ -17,7 +16,8 @@ import (
"github.com/kubeshark/kubeshark/utils"
"github.com/kubeshark/worker/api"
"github.com/kubeshark/worker/models"
"github.com/op/go-logging"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
auth "k8s.io/api/authorization/v1"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
@@ -78,7 +78,11 @@ func NewProvider(kubeConfigPath string, contextName string) (*Provider, error) {
"you can set alternative kube config file path by adding the kube-config-path field to the kubeshark config file, err: %w", kubeConfigPath, err)
}
log.Printf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent)
log.Debug().
Str("host", restClientConfig.Host).
Str("api-path", restClientConfig.APIPath).
Str("user-agent", restClientConfig.UserAgent).
Msg("K8s client config.")
return &Provider{
clientSet: clientSet,
@@ -181,7 +185,7 @@ type HubOptions struct {
MaxEntriesDBSizeBytes int64
Resources models.Resources
ImagePullPolicy core.PullPolicy
LogLevel logging.Level
LogLevel zerolog.Level
Profiler bool
}
@@ -806,8 +810,14 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
return nil
}
func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool, maxLiveStreams int) error {
log.Printf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeNames), namespace, daemonSetName, podImage, tapperPodName)
func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel zerolog.Level, serviceMesh bool, tls bool, maxLiveStreams int) error {
log.Debug().
Int("node-count", len(nodeNames)).
Str("namespace", namespace).
Str("daemonset-name", daemonSetName).
Str("image", podImage).
Str("pod-name", tapperPodName).
Msg("Applying tapper DaemonSets.")
if len(nodeNames) == 0 {
return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName)
@@ -1159,7 +1169,7 @@ func (provider *Provider) ListManagedRoleBindings(ctx context.Context, namespace
func (provider *Provider) ValidateNotProxy() error {
kubernetesUrl, err := url.Parse(provider.clientConfig.Host)
if err != nil {
log.Printf("ValidateNotProxy - error while parsing kubernetes host, err: %v", err)
log.Debug().Err(err).Msg("While parsing Kubernetes host!")
return nil
}
@@ -1184,7 +1194,7 @@ func (provider *Provider) ValidateNotProxy() error {
func (provider *Provider) GetKubernetesVersion() (*semver.SemVersion, error) {
serverVersion, err := provider.clientSet.ServerVersion()
if err != nil {
log.Printf("error while getting kubernetes server version, err: %v", err)
log.Debug().Err(err).Msg("While getting Kubernetes server version!")
return nil, err
}
@@ -1211,7 +1221,7 @@ func ValidateKubernetesVersion(serverVersionSemVer *semver.SemVersion) error {
}
func loadKubernetesConfiguration(kubeConfigPath string, context string) clientcmd.ClientConfig {
log.Printf("Using kube config %s", kubeConfigPath)
log.Info().Str("path", kubeConfigPath).Msg("Using kubeconfig:")
configPathList := filepath.SplitList(kubeConfigPath)
configLoadingRules := &clientcmd.ClientConfigLoadingRules{}
if len(configPathList) <= 1 {

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"log"
"net"
"net/http"
"net/url"
@@ -12,10 +11,10 @@ import (
"strings"
"time"
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubectl/pkg/proxy"
)
@@ -23,7 +22,13 @@ const k8sProxyApiPrefix = "/"
const kubesharkServicePort = 80
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, dstPort uint16, kubesharkNamespace string, kubesharkServiceName string, cancel context.CancelFunc) (*http.Server, error) {
log.Printf("Starting proxy - namespace: [%v], service name: [%s], port: [%d:%d]\n", kubesharkNamespace, kubesharkServiceName, srcPort, dstPort)
log.Info().
Str("namespace", kubesharkNamespace).
Str("service-name", kubesharkServiceName).
Int("src-port", int(srcPort)).
Int("dst-port", int(dstPort)).
Msg("Starting proxy...")
filter := &proxy.FilterServer{
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),
@@ -50,7 +55,7 @@ func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16,
go func() {
if err := server.Serve(l); err != nil && err != http.ErrServerClosed {
log.Printf("Error creating proxy, %v", err)
log.Error().Err(err).Msg("While creating proxy!")
cancel()
}
}()
@@ -105,7 +110,12 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
podName := pods[0].Name
log.Printf("Starting proxy using port-forward method. namespace: [%v], pod name: [%s], %d:%d", namespace, podName, srcPort, dstPort)
log.Info().
Str("namespace", namespace).
Str("pod-name", podName).
Int("src-port", int(srcPort)).
Int("dst-port", int(dstPort)).
Msg("Starting proxy using port-forward method...")
dialer, err := getHttpDialer(kubernetesProvider, namespace, podName)
if err != nil {
@@ -122,7 +132,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
go func() {
if err = forwarder.ForwardPorts(); err != nil {
log.Printf("kubernetes port-forwarding error: %v", err)
log.Error().Err(err).Msg("While Kubernetes port-forwarding!")
cancel()
}
}()
@@ -133,7 +143,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) (httpstream.Dialer, error) {
roundTripper, upgrader, err := spdy.RoundTripperFor(&kubernetesProvider.clientConfig)
if err != nil {
log.Printf("Error creating http dialer")
log.Error().Err(err).Msg("While creating HTTP dialer!")
return nil, err
}
@@ -144,7 +154,9 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName)
serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host}
log.Printf("Http dialer url %v", serverURL)
log.Debug().
Str("url", serverURL.String()).
Msg("HTTP dialer URL:")
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil
}

View File

@@ -4,12 +4,11 @@ import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/kubeshark/kubeshark/debounce"
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/watch"
)
@@ -57,13 +56,13 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
} else {
if !watchRestartDebouncer.IsOn() {
if err := watchRestartDebouncer.SetOn(); err != nil {
log.Print(err)
log.Error().Err(err)
}
log.Print("k8s watch channel closed, restarting watcher")
log.Warn().Msg("K8s watch channel closed, restarting watcher...")
time.Sleep(time.Second * 5)
continue
} else {
errorChan <- errors.New("k8s watch unstable, closes frequently")
errorChan <- errors.New("K8s watch unstable, closes frequently")
break
}
}