feat: initial Prometheus analyzers (#855)

* feat: initial Prometheus analyzers

Added a prometheus integration with two analyzers:
1. PrometheusConfigValidate
2. PrometheusConfigRelabelReport

The integration does not deploy any Prometheus stack in the cluster.
Instead, it searches the provided --namespace for a Prometheus
configuration, stored in a ConfigMap or Secret. If it finds one, it
unmarshals it into memory and runs the analyzers on it.

PrometheusConfigValidate checks if the actual Prometheus configuration is valid or has
any errors.

PrometheusConfigRelabelReport tries to distill the scrape config
relabeling rules to give a concise label set per job that targets need
to have to be scraped. This analyzer is unconventional, in that it does
not necessarily mean there are issues with the config. It merely tries
to give a human-readable explanation of the relabel rules it discovers,
leaning on the LLM and prompt.

Tested on both kube-prometheus and Google Managed Prometheus
stacks.

Signed-off-by: Daniel Clark <danielclark@google.com>

* review: feedback cycle 1

Simplify ConfigValidate prompt and add comments.

Signed-off-by: Daniel Clark <danielclark@google.com>

* review: feedback cycle 2

Add Prometheus configuration discovery to integration activate command.

Also improve logging to make this more clear to users.

Signed-off-by: Daniel Clark <danielclark@google.com>

---------

Signed-off-by: Daniel Clark <danielclark@google.com>
This commit is contained in:
Daniel Clark
2024-01-12 04:58:09 -05:00
committed by GitHub
parent 4106d39c32
commit 45fa827c04
7 changed files with 1810 additions and 8 deletions

View File

@@ -0,0 +1,290 @@
package prometheus
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"github.com/k8sgpt-ai/k8sgpt/pkg/common"
"github.com/k8sgpt-ai/k8sgpt/pkg/util"
promconfig "github.com/prometheus/prometheus/config"
yaml "gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
const (
prometheusContainerName = "prometheus"
configReloaderContainerName = "config-reloader"
prometheusConfigFlag = "--config.file="
configReloaderConfigFlag = "--config-file="
)
var prometheusPodLabels = map[string]string{
"app": "prometheus",
"app.kubernetes.io/name": "prometheus",
}
type ConfigAnalyzer struct {
}
// podConfig groups a specific pod with the Prometheus configuration and any
// other state used for informing the common.Result.
type podConfig struct {
b []byte
pod *corev1.Pod
}
func (c *ConfigAnalyzer) Analyze(a common.Analyzer) ([]common.Result, error) {
ctx := a.Context
client := a.Client.GetClient()
namespace := a.Namespace
kind := ConfigValidate
podConfigs, err := findPrometheusPodConfigs(ctx, client, namespace)
if err != nil {
return nil, err
}
var preAnalysis = map[string]common.PreAnalysis{}
for _, pc := range podConfigs {
var failures []common.Failure
pod := pc.pod
// Check upstream validation.
// The Prometheus configuration structs do not generally have validation
// methods and embed their validation logic in the UnmarshalYAML methods.
config, err := unmarshalPromConfigBytes(pc.b)
if err != nil {
failures = append(failures, common.Failure{
Text: fmt.Sprintf("error validating Prometheus YAML configuration: %s", err),
})
}
_, err = yaml.Marshal(config)
if err != nil {
failures = append(failures, common.Failure{
Text: fmt.Sprintf("error validating Prometheus struct configuration: %s", err),
})
}
// Check for empty scrape config.
if len(config.ScrapeConfigs) == 0 {
failures = append(failures, common.Failure{
Text: "no scrape configurations. Prometheus will not scrape any metrics.",
})
}
if len(failures) > 0 {
preAnalysis[fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)] = common.PreAnalysis{
Pod: *pod,
FailureDetails: failures,
}
}
}
for key, value := range preAnalysis {
var currentAnalysis = common.Result{
Kind: kind,
Name: key,
Error: value.FailureDetails,
}
parent, _ := util.GetParent(a.Client, value.Pod.ObjectMeta)
currentAnalysis.ParentObject = parent
a.Results = append(a.Results, currentAnalysis)
}
return a.Results, nil
}
func configKey(namespace string, volume *corev1.Volume) (string, error) {
if volume.ConfigMap != nil {
return fmt.Sprintf("configmap/%s/%s", namespace, volume.ConfigMap.Name), nil
} else if volume.Secret != nil {
return fmt.Sprintf("secret/%s/%s", namespace, volume.Secret.SecretName), nil
} else {
return "", errors.New("volume format must be ConfigMap or Secret")
}
}
func findPrometheusPodConfigs(ctx context.Context, client kubernetes.Interface, namespace string) ([]podConfig, error) {
var configs []podConfig
pods, err := findPrometheusPods(ctx, client, namespace)
if err != nil {
return nil, err
}
var configCache = make(map[string]bool)
for _, pod := range pods {
// Extract volume of Promethues config.
volume, key, err := findPrometheusConfigVolumeAndKey(ctx, client, &pod)
if err != nil {
return nil, err
}
// See if we processed it already; if so, don't process again.
ck, err := configKey(pod.Namespace, volume)
if err != nil {
return nil, err
}
_, ok := configCache[ck]
if ok {
continue
}
configCache[ck] = true
// Extract Prometheus config bytes from volume.
b, err := extractPrometheusConfigFromVolume(ctx, client, volume, pod.Namespace, key)
if err != nil {
return nil, err
}
configs = append(configs, podConfig{
pod: &pod,
b: b,
})
}
return configs, nil
}
func findPrometheusPods(ctx context.Context, client kubernetes.Interface, namespace string) ([]corev1.Pod, error) {
var proms []corev1.Pod
for k, v := range prometheusPodLabels {
pods, err := util.GetPodListByLabels(client, namespace, map[string]string{
k: v,
})
if err != nil {
return nil, err
}
proms = append(proms, pods.Items...)
}
// If we still haven't found any Prometheus pods, make a last-ditch effort to
// scrape the namespace for "prometheus" containers.
if len(proms) == 0 {
pods, err := client.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
for _, c := range pod.Spec.Containers {
if c.Name == prometheusContainerName {
proms = append(proms, pod)
}
}
}
}
return proms, nil
}
func findPrometheusConfigPath(ctx context.Context, client kubernetes.Interface, pod *corev1.Pod) (string, error) {
var path string
var err error
for _, container := range pod.Spec.Containers {
for _, arg := range container.Args {
// Prefer the config-reloader container config file as it normally
// references the ConfigMap or Secret volume mount.
// Fallback to the prometheus container if that's not found.
if strings.HasPrefix(arg, prometheusConfigFlag) {
path = strings.TrimLeft(arg, prometheusConfigFlag)
}
if strings.HasPrefix(arg, configReloaderConfigFlag) {
path = strings.TrimLeft(arg, configReloaderConfigFlag)
}
}
if container.Name == configReloaderContainerName {
return path, nil
}
}
if path == "" {
err = fmt.Errorf("prometheus config path not found in pod: %s", pod.Name)
}
return path, err
}
func findPrometheusConfigVolumeAndKey(ctx context.Context, client kubernetes.Interface, pod *corev1.Pod) (*corev1.Volume, string, error) {
path, err := findPrometheusConfigPath(ctx, client, pod)
if err != nil {
return nil, "", err
}
// Find the volumeMount the config path is pointing to.
var volumeName = ""
for _, container := range pod.Spec.Containers {
for _, vm := range container.VolumeMounts {
if strings.HasPrefix(path, vm.MountPath) {
volumeName = vm.Name
break
}
}
}
// Get the actual Volume from the name.
for _, volume := range pod.Spec.Volumes {
if volume.Name == volumeName {
return &volume, filepath.Base(path), nil
}
}
return nil, "", errors.New("volume for Prometheus config not found")
}
func extractPrometheusConfigFromVolume(ctx context.Context, client kubernetes.Interface, volume *corev1.Volume, namespace, key string) ([]byte, error) {
var b []byte
var ok bool
// Check for Secret volume.
if vs := volume.Secret; vs != nil {
s, err := client.CoreV1().Secrets(namespace).Get(ctx, vs.SecretName, v1.GetOptions{})
if err != nil {
return nil, err
}
b, ok = s.Data[key]
if !ok {
return nil, fmt.Errorf("unable to find file key in secret: %s", key)
}
}
// Check for ConfigMap volume.
if vcm := volume.ConfigMap; vcm != nil {
cm, err := client.CoreV1().ConfigMaps(namespace).Get(ctx, vcm.Name, v1.GetOptions{})
if err != nil {
return nil, err
}
s, ok := cm.Data[key]
b = []byte(s)
if !ok {
return nil, fmt.Errorf("unable to find file key in configmap: %s", key)
}
}
return b, nil
}
func unmarshalPromConfigBytes(b []byte) (*promconfig.Config, error) {
var config promconfig.Config
// Unmarshal the data into a Prometheus config.
if err := yaml.Unmarshal(b, &config); err == nil {
return &config, nil
// If there were errors, try gunziping the data.
} else if content := http.DetectContentType(b); content == "application/x-gzip" {
r, err := gzip.NewReader(bytes.NewBuffer(b))
if err != nil {
return &config, err
}
gunzipBytes, err := io.ReadAll(r)
if err != nil {
return &config, err
}
err = yaml.Unmarshal(gunzipBytes, &config)
if err != nil {
return nil, err
}
return &config, nil
} else {
return &config, err
}
}

View File

@@ -0,0 +1,105 @@
package prometheus
import (
"context"
"errors"
"fmt"
"os"
"github.com/fatih/color"
"github.com/k8sgpt-ai/k8sgpt/pkg/common"
"github.com/k8sgpt-ai/k8sgpt/pkg/kubernetes"
"github.com/spf13/viper"
)
const (
ConfigValidate = "PrometheusConfigValidate"
ConfigRelabel = "PrometheusConfigRelabelReport"
)
type Prometheus struct {
}
func NewPrometheus() *Prometheus {
return &Prometheus{}
}
func (p *Prometheus) Deploy(namespace string) error {
// no-op
color.Green("Activating prometheus integration...")
// TODO(pintohutch): add timeout or inherit an upstream context
// for better signal management.
ctx := context.Background()
kubecontext := viper.GetString("kubecontext")
kubeconfig := viper.GetString("kubeconfig")
client, err := kubernetes.NewClient(kubecontext, kubeconfig)
if err != nil {
color.Red("Error initialising kubernetes client: %v", err)
os.Exit(1)
}
// We just care about existing deployments.
// Try and find Prometheus configurations in the cluster using the provided namespace.
//
// Note: We could cache this state and inject it into the various analyzers
// to save additional parsing later.
// However, the state of the cluster can change from activation to analysis,
// so we would want to run this again on each analyze call anyway.
//
// One consequence of this is one can run `activate` in one namespace
// and run `analyze` in another, without issues, as long as Prometheus
// is found in both.
// We accept this as a trade-off for the time-being to avoid having the tool
// manage Prometheus on the behalf of users.
podConfigs, err := findPrometheusPodConfigs(ctx, client.GetClient(), namespace)
if err != nil {
color.Red("Error discovering Prometheus worklads: %v", err)
os.Exit(1)
}
if len(podConfigs) == 0 {
color.Yellow(fmt.Sprintf(`Prometheus installation not found in namespace: %s.
Please ensure Prometheus is deployed to analyze.`, namespace))
return errors.New("no prometheus installation found")
}
// Prime state of the analyzer so
color.Green("Found existing installation")
return nil
}
func (p *Prometheus) UnDeploy(_ string) error {
// no-op
// We just care about existing deployments.
color.Yellow("Integration will leave Prometheus resources deployed. This is an effective no-op in the cluster.")
return nil
}
func (p *Prometheus) AddAnalyzer(mergedMap *map[string]common.IAnalyzer) {
(*mergedMap)[ConfigValidate] = &ConfigAnalyzer{}
(*mergedMap)[ConfigRelabel] = &RelabelAnalyzer{}
}
func (p *Prometheus) GetAnalyzerName() []string {
return []string{ConfigValidate, ConfigRelabel}
}
func (p *Prometheus) GetNamespace() (string, error) {
return "", nil
}
func (p *Prometheus) OwnsAnalyzer(analyzer string) bool {
return (analyzer == ConfigValidate) || (analyzer == ConfigRelabel)
}
func (t *Prometheus) IsActivate() bool {
activeFilters := viper.GetStringSlice("active_filters")
for _, filter := range t.GetAnalyzerName() {
for _, af := range activeFilters {
if af == filter {
return true
}
}
}
return false
}

View File

@@ -0,0 +1,85 @@
package prometheus
import (
"fmt"
"github.com/k8sgpt-ai/k8sgpt/pkg/common"
"github.com/k8sgpt-ai/k8sgpt/pkg/util"
discoverykube "github.com/prometheus/prometheus/discovery/kubernetes"
"gopkg.in/yaml.v2"
)
type RelabelAnalyzer struct {
}
func (r *RelabelAnalyzer) Analyze(a common.Analyzer) ([]common.Result, error) {
ctx := a.Context
client := a.Client.GetClient()
namespace := a.Namespace
kind := ConfigRelabel
podConfigs, err := findPrometheusPodConfigs(ctx, client, namespace)
if err != nil {
return nil, err
}
var preAnalysis = map[string]common.PreAnalysis{}
for _, pc := range podConfigs {
var failures []common.Failure
pod := pc.pod
// Check upstream validation.
// The Prometheus configuration structs do not generally have validation
// methods and embed their validation logic in the UnmarshalYAML methods.
config, _ := unmarshalPromConfigBytes(pc.b)
// Limit output for brevity.
limit := 6
i := 0
for _, sc := range config.ScrapeConfigs {
if i == limit {
break
}
if sc == nil {
continue
}
brc, _ := yaml.Marshal(sc.RelabelConfigs)
var bsd []byte
for _, cfg := range sc.ServiceDiscoveryConfigs {
ks, ok := cfg.(*discoverykube.SDConfig)
if !ok {
continue
}
bsd, _ = yaml.Marshal(ks)
}
// Don't bother with relabel analysis if the scrape config
// or service discovery config are empty.
if len(brc) == 0 || len(bsd) == 0 {
continue
}
failures = append(failures, common.Failure{
Text: fmt.Sprintf("job_name:\n%s\nrelabel_configs:\n%s\nkubernetes_sd_configs:\n%s\n", sc.JobName, string(brc), string(bsd)),
})
i++
}
if len(failures) > 0 {
preAnalysis[fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)] = common.PreAnalysis{
Pod: *pod,
FailureDetails: failures,
}
}
}
for key, value := range preAnalysis {
var currentAnalysis = common.Result{
Kind: kind,
Name: key,
Error: value.FailureDetails,
}
parent, _ := util.GetParent(a.Client, value.Pod.ObjectMeta)
currentAnalysis.ParentObject = parent
a.Results = append(a.Results, currentAnalysis)
}
return a.Results, nil
}