kubeshark/agent/pkg/resolver/resolver.go
M. Mert Yıldıran e1ad302c29
Make logger a separate module such that don't depend on shared module as a whole for logging (#1047)
* Make `logger` a separate module such that don't depend on `shared` module as a whole for logging

* Update `Dockerfile`
2022-04-27 22:26:27 +03:00

209 lines
6.0 KiB
Go

package resolver
import (
"context"
"errors"
"fmt"
"github.com/up9inc/mizu/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
cmap "github.com/orcaman/concurrent-map"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
const (
kubClientNullString = "None"
)
type Resolver struct {
clientConfig *restclient.Config
clientSet *kubernetes.Clientset
nameMap cmap.ConcurrentMap
serviceMap cmap.ConcurrentMap
isStarted bool
errOut chan error
namespace string
}
type ResolvedObjectInfo struct {
FullAddress string
Namespace string
}
func (resolver *Resolver) Start(ctx context.Context) {
if !resolver.isStarted {
resolver.isStarted = true
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchServices)
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchEndpoints)
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchPods)
}
}
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
resolvedName, isFound := resolver.nameMap.Get(name)
if !isFound {
return nil
}
return resolvedName.(*ResolvedObjectInfo)
}
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
return resolver.nameMap
}
func (resolver *Resolver) CheckIsServiceIP(address string) bool {
_, isFound := resolver.serviceMap.Get(address)
return isFound
}
func (resolver *Resolver) watchPods(ctx context.Context) error {
// empty namespace makes the client watch all namespaces
watcher, err := resolver.clientSet.CoreV1().Pods(resolver.namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
for {
select {
case event := <-watcher.ResultChan():
if event.Object == nil {
return errors.New("error in kubectl pod watch")
}
if event.Type == watch.Deleted {
pod := event.Object.(*corev1.Pod)
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
}
case <-ctx.Done():
watcher.Stop()
return nil
}
}
}
func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
// empty namespace makes the client watch all namespaces
watcher, err := resolver.clientSet.CoreV1().Endpoints(resolver.namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
for {
select {
case event := <-watcher.ResultChan():
if event.Object == nil {
return errors.New("error in kubectl endpoint watch")
}
endpoint := event.Object.(*corev1.Endpoints)
serviceHostname := fmt.Sprintf("%s.%s", endpoint.Name, endpoint.Namespace)
if endpoint.Subsets != nil {
for _, subset := range endpoint.Subsets {
var ports []int32
if subset.Ports != nil {
for _, portMapping := range subset.Ports {
if portMapping.Port > 0 {
ports = append(ports, portMapping.Port)
}
}
}
if subset.Addresses != nil {
for _, address := range subset.Addresses {
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
for _, port := range ports {
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
}
}
}
}
}
case <-ctx.Done():
watcher.Stop()
return nil
}
}
}
func (resolver *Resolver) watchServices(ctx context.Context) error {
// empty namespace makes the client watch all namespaces
watcher, err := resolver.clientSet.CoreV1().Services(resolver.namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
for {
select {
case event := <-watcher.ResultChan():
if event.Object == nil {
return errors.New("error in kubectl service watch")
}
service := event.Object.(*corev1.Service)
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
if service.Spec.Ports != nil {
for _, port := range service.Spec.Ports {
if port.Port > 0 {
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
}
}
}
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
}
}
case <-ctx.Done():
watcher.Stop()
return nil
}
}
}
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(resolved)
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
resolver.nameMap.Set(resolved, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.serviceMap.Remove(key)
} else {
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
}
}
func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun func(ctx context.Context) error) {
for {
err := fun(ctx)
if err != nil {
resolver.errOut <- err
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == metav1.StatusReasonForbidden {
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v", err)
return
}
}
}
if ctx.Err() != nil { // context was cancelled or errored
return
}
}
}