feat!: migrate api to grpc (#386)

* feat: migrate api to grpc

Signed-off-by: Matthis Holleville <matthish29@gmail.com>

* feat: use status.Code instead grpc.Code in log

Signed-off-by: Matthis Holleville <matthish29@gmail.com>

---------

Signed-off-by: Matthis Holleville <matthish29@gmail.com>
Co-authored-by: Alex Jones <alexsimonjones@gmail.com>
This commit is contained in:
Matthis
2023-05-09 09:36:44 +02:00
committed by GitHub
parent b6b06123db
commit 9998e7620d
6 changed files with 774 additions and 131 deletions

View File

@@ -15,23 +15,33 @@ package server
import (
json "encoding/json"
"errors"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/fatih/color"
"github.com/k8sgpt-ai/k8sgpt/pkg/analysis"
rpc "buf.build/gen/go/k8sgpt-ai/k8sgpt/grpc/go/schema/v1/schemav1grpc"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type Config struct {
Port string
MetricsPort string
Backend string
Key string
Token string
Output string
maxConcurrency int
Handler *handler
Logger *zap.Logger
metricsServer *http.Server
}
type Health struct {
@@ -46,66 +56,46 @@ var health = Health{
Failure: 0,
}
type Result struct {
Analysis []analysis.Analysis `json:"analysis"`
}
func (s *Config) analyzeHandler(w http.ResponseWriter, r *http.Request) {
namespace := r.URL.Query().Get("namespace")
explain := getBoolParam(r.URL.Query().Get("explain"))
anonymize := getBoolParam(r.URL.Query().Get("anonymize"))
nocache := getBoolParam(r.URL.Query().Get("nocache"))
language := r.URL.Query().Get("language")
var err error
s.maxConcurrency, err = strconv.Atoi(r.URL.Query().Get("maxConcurrency"))
if err != nil {
s.maxConcurrency = 10
}
s.Output = r.URL.Query().Get("output")
if s.Output == "" {
s.Output = "json"
}
config, err := analysis.NewAnalysis(s.Backend, language, []string{}, namespace, nocache, explain, s.maxConcurrency)
if err != nil {
health.Failure++
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
config.RunAnalysis()
if explain {
err := config.GetAIResults(s.Output, anonymize)
if err != nil {
health.Failure++
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
out, err := config.PrintOutput(s.Output)
if err != nil {
health.Failure++
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
health.Success++
fmt.Fprintf(w, string(out))
}
func (s *Config) Serve() error {
handler := loggingMiddleware(http.DefaultServeMux)
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/analyze", s.analyzeHandler)
http.HandleFunc("/healthz", s.healthzHandler)
color.Green("Starting server on port %s", s.Port)
err := http.ListenAndServe(":"+s.Port, handler)
var lis net.Listener
var err error
address := fmt.Sprintf(":%s", s.Port)
lis, err = net.Listen("tcp", address)
if err != nil {
fmt.Printf("error starting server: %s\n", err)
return err
}
s.Logger.Info(fmt.Sprintf("binding api to %s", s.Port))
grpcServerUnaryInterceptor := grpc.UnaryInterceptor(logInterceptor(s.Logger))
grpcServer := grpc.NewServer(grpcServerUnaryInterceptor)
reflection.Register(grpcServer)
rpc.RegisterServerServer(grpcServer, s.Handler)
if err := grpcServer.Serve(
lis,
); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}
func (s *Config) ServeMetrics() error {
s.Logger.Info(fmt.Sprintf("binding metrics to %s", s.MetricsPort))
s.metricsServer = &http.Server{
ReadHeaderTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%s", s.MetricsPort),
}
s.metricsServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/healthz":
w.WriteHeader(http.StatusOK)
case "/metrics":
promhttp.Handler().ServeHTTP(w, r)
default:
w.WriteHeader(http.StatusNotFound)
}
})
if err := s.metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil