mirror of
https://github.com/k8sgpt-ai/k8sgpt.git
synced 2025-09-24 20:57:29 +00:00
fix: migrated to more actively maintained mcp golang lib and added AI explain (#1557)
* migrated to more actively maintained mcp golang lib and added AI explain support for mcp mode Signed-off-by: Umesh Kaul <umeshkaul@gmail.com> * added a makefile option to create local docker image for testing Signed-off-by: Umesh Kaul <umeshkaul@gmail.com> * fixed linter errors and made anonymize as an arg Signed-off-by: Umesh Kaul <umeshkaul@gmail.com> --------- Signed-off-by: Umesh Kaul <umeshkaul@gmail.com> Co-authored-by: Alex Jones <alexsimonjones@gmail.com>
This commit is contained in:
@@ -40,12 +40,20 @@ type AnalyzeRequest struct {
|
||||
WithStats bool `json:"withStats,omitempty"`
|
||||
}
|
||||
|
||||
// AnalyzeResponse represents the output of the analyze tool
|
||||
type AnalyzeResponse struct {
|
||||
Content []struct {
|
||||
Text string `json:"text"`
|
||||
Type string `json:"type"`
|
||||
} `json:"content"`
|
||||
// JSONRPCResponse represents the JSON-RPC response format
|
||||
type JSONRPCResponse struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
ID int `json:"id"`
|
||||
Result struct {
|
||||
Content []struct {
|
||||
Text string `json:"text"`
|
||||
Type string `json:"type"`
|
||||
} `json:"content"`
|
||||
} `json:"result,omitempty"`
|
||||
Error *struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
} `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
@@ -65,23 +73,89 @@ func main() {
|
||||
MaxConcurrency: 10,
|
||||
}
|
||||
|
||||
// Convert request to JSON
|
||||
reqJSON, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to marshal request: %v", err)
|
||||
}
|
||||
// Note: req is now used directly in the JSON-RPC request
|
||||
|
||||
// Create HTTP client with timeout
|
||||
client := &http.Client{
|
||||
Timeout: 5 * time.Minute,
|
||||
}
|
||||
|
||||
// Send request to MCP server
|
||||
resp, err := client.Post(
|
||||
fmt.Sprintf("http://localhost:%s/mcp/analyze", *serverPort),
|
||||
// First, initialize the session
|
||||
initRequest := map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "initialize",
|
||||
"params": map[string]interface{}{
|
||||
"protocolVersion": "2025-03-26",
|
||||
"capabilities": map[string]interface{}{
|
||||
"tools": map[string]interface{}{},
|
||||
"resources": map[string]interface{}{},
|
||||
"prompts": map[string]interface{}{},
|
||||
},
|
||||
"clientInfo": map[string]interface{}{
|
||||
"name": "k8sgpt-client",
|
||||
"version": "1.0.0",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
initData, err := json.Marshal(initRequest)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to marshal init request: %v", err)
|
||||
}
|
||||
|
||||
// Send initialization request
|
||||
initResp, err := client.Post(
|
||||
fmt.Sprintf("http://localhost:%s/mcp", *serverPort),
|
||||
"application/json",
|
||||
bytes.NewBuffer(reqJSON),
|
||||
bytes.NewBuffer(initData),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to send init request: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := initResp.Body.Close(); err != nil {
|
||||
log.Printf("Error closing init response body: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Extract session ID from response headers
|
||||
sessionID := initResp.Header.Get("Mcp-Session-Id")
|
||||
if sessionID == "" {
|
||||
log.Println("Warning: No session ID received from server")
|
||||
}
|
||||
|
||||
// Create JSON-RPC request for analyze
|
||||
jsonRPCRequest := map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "analyze",
|
||||
"arguments": req,
|
||||
},
|
||||
}
|
||||
|
||||
// Convert to JSON
|
||||
jsonRPCData, err := json.Marshal(jsonRPCRequest)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to marshal JSON-RPC request: %v", err)
|
||||
}
|
||||
|
||||
// Create request with session ID if available
|
||||
httpReq, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/mcp", *serverPort), bytes.NewBuffer(jsonRPCData))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create request: %v", err)
|
||||
}
|
||||
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("Accept", "application/json,text/event-stream")
|
||||
if sessionID != "" {
|
||||
httpReq.Header.Set("Mcp-Session-Id", sessionID)
|
||||
}
|
||||
|
||||
// Send request to MCP server
|
||||
resp, err := client.Do(httpReq)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to send request: %v", err)
|
||||
}
|
||||
@@ -99,15 +173,17 @@ func main() {
|
||||
fmt.Printf("Raw response: %s\n", string(body))
|
||||
|
||||
// Parse response
|
||||
var analyzeResp AnalyzeResponse
|
||||
if err := json.Unmarshal(body, &analyzeResp); err != nil {
|
||||
var jsonRPCResp JSONRPCResponse
|
||||
if err := json.Unmarshal(body, &jsonRPCResp); err != nil {
|
||||
log.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
// Print results
|
||||
fmt.Println("Analysis Results:")
|
||||
if len(analyzeResp.Content) > 0 {
|
||||
fmt.Println(analyzeResp.Content[0].Text)
|
||||
if jsonRPCResp.Error != nil {
|
||||
fmt.Printf("Error: %s (code: %d)\n", jsonRPCResp.Error.Message, jsonRPCResp.Error.Code)
|
||||
} else if len(jsonRPCResp.Result.Content) > 0 {
|
||||
fmt.Println(jsonRPCResp.Result.Content[0].Text)
|
||||
} else {
|
||||
fmt.Println("No results returned")
|
||||
}
|
||||
|
@@ -17,88 +17,205 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
schemav1 "buf.build/gen/go/k8sgpt-ai/k8sgpt/protocolbuffers/go/schema/v1"
|
||||
"github.com/k8sgpt-ai/k8sgpt/pkg/ai"
|
||||
"github.com/k8sgpt-ai/k8sgpt/pkg/analysis"
|
||||
"github.com/k8sgpt-ai/k8sgpt/pkg/kubernetes"
|
||||
"github.com/k8sgpt-ai/k8sgpt/pkg/server/config"
|
||||
mcp_golang "github.com/metoro-io/mcp-golang"
|
||||
mcp_http "github.com/metoro-io/mcp-golang/transport/http"
|
||||
"github.com/metoro-io/mcp-golang/transport/stdio"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
"github.com/mark3labs/mcp-go/server"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// MCPServer represents an MCP server for k8sgpt
|
||||
type MCPServer struct {
|
||||
server *mcp_golang.Server
|
||||
port string
|
||||
aiProvider *ai.AIProvider
|
||||
useHTTP bool
|
||||
logger *zap.Logger
|
||||
// K8sGptMCPServer represents an MCP server for k8sgpt
|
||||
type K8sGptMCPServer struct {
|
||||
server *server.MCPServer
|
||||
port string
|
||||
aiProvider *ai.AIProvider
|
||||
useHTTP bool
|
||||
logger *zap.Logger
|
||||
httpServer *server.StreamableHTTPServer
|
||||
stdioServer *server.StdioServer
|
||||
}
|
||||
|
||||
// NewMCPServer creates a new MCP server
|
||||
func NewMCPServer(port string, aiProvider *ai.AIProvider, useHTTP bool, logger *zap.Logger) (*MCPServer, error) {
|
||||
opts := []mcp_golang.ServerOptions{
|
||||
mcp_golang.WithName("k8sgpt"),
|
||||
mcp_golang.WithVersion("1.0.0"),
|
||||
func NewMCPServer(port string, aiProvider *ai.AIProvider, useHTTP bool, logger *zap.Logger) (*K8sGptMCPServer, error) {
|
||||
opts := []server.ServerOption{
|
||||
server.WithToolCapabilities(true),
|
||||
server.WithResourceCapabilities(true, false),
|
||||
server.WithPromptCapabilities(false),
|
||||
}
|
||||
|
||||
var server *mcp_golang.Server
|
||||
if useHTTP {
|
||||
logger.Info("starting MCP server with http transport on port", zap.String("port", port))
|
||||
httpTransport := mcp_http.NewHTTPTransport("/mcp").WithAddr(":" + port)
|
||||
server = mcp_golang.NewServer(httpTransport, opts...)
|
||||
} else {
|
||||
server = mcp_golang.NewServer(stdio.NewStdioServerTransport(), opts...)
|
||||
}
|
||||
|
||||
return &MCPServer{
|
||||
server: server,
|
||||
// Create the MCP server
|
||||
mcpServer := server.NewMCPServer("k8sgpt", "1.0.0", opts...)
|
||||
var k8sGptMCPServer = &K8sGptMCPServer{
|
||||
server: mcpServer,
|
||||
port: port,
|
||||
aiProvider: aiProvider,
|
||||
useHTTP: useHTTP,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Register tools and resources immediately
|
||||
if err := k8sGptMCPServer.registerToolsAndResources(); err != nil {
|
||||
return nil, fmt.Errorf("failed to register tools and resources: %v", err)
|
||||
}
|
||||
|
||||
if useHTTP {
|
||||
// Create HTTP server with streamable transport
|
||||
httpOpts := []server.StreamableHTTPOption{
|
||||
server.WithLogger(&zapLoggerAdapter{logger: logger}),
|
||||
}
|
||||
|
||||
httpServer := server.NewStreamableHTTPServer(mcpServer, httpOpts...)
|
||||
|
||||
// Launch the HTTP server directly
|
||||
go func() {
|
||||
logger.Info("Starting MCP HTTP server", zap.String("port", port))
|
||||
if err := httpServer.Start(":" + port); err != nil {
|
||||
logger.Fatal("MCP HTTP server failed", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
return &K8sGptMCPServer{
|
||||
server: mcpServer,
|
||||
port: port,
|
||||
aiProvider: aiProvider,
|
||||
useHTTP: useHTTP,
|
||||
logger: logger,
|
||||
httpServer: httpServer,
|
||||
}, nil
|
||||
} else {
|
||||
// Create stdio server
|
||||
stdioServer := server.NewStdioServer(mcpServer)
|
||||
|
||||
return &K8sGptMCPServer{
|
||||
server: mcpServer,
|
||||
port: port,
|
||||
aiProvider: aiProvider,
|
||||
useHTTP: useHTTP,
|
||||
logger: logger,
|
||||
stdioServer: stdioServer,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the MCP server
|
||||
func (s *MCPServer) Start() error {
|
||||
func (s *K8sGptMCPServer) Start() error {
|
||||
if s.server == nil {
|
||||
return fmt.Errorf("server not initialized")
|
||||
}
|
||||
|
||||
// Register analyze tool
|
||||
if err := s.server.RegisterTool("analyze", "Analyze Kubernetes resources", s.handleAnalyze); err != nil {
|
||||
return fmt.Errorf("failed to register analyze tool: %v", err)
|
||||
// Register prompts
|
||||
if err := s.registerPrompts(); err != nil {
|
||||
return fmt.Errorf("failed to register prompts: %v", err)
|
||||
}
|
||||
|
||||
// Register cluster info tool
|
||||
if err := s.server.RegisterTool("cluster-info", "Get Kubernetes cluster information", s.handleClusterInfo); err != nil {
|
||||
return fmt.Errorf("failed to register cluster-info tool: %v", err)
|
||||
}
|
||||
|
||||
// Register config tool
|
||||
if err := s.server.RegisterTool("config", "Configure K8sGPT settings", s.handleConfig); err != nil {
|
||||
return fmt.Errorf("failed to register config tool: %v", err)
|
||||
}
|
||||
|
||||
// Register resources
|
||||
if err := s.registerResources(); err != nil {
|
||||
return fmt.Errorf("failed to register resources: %v", err)
|
||||
}
|
||||
|
||||
// Register prompts
|
||||
if err := s.registerPrompts(); err != nil {
|
||||
return fmt.Errorf("failed to register prompts: %v", err)
|
||||
// Start the server based on transport type
|
||||
if s.useHTTP {
|
||||
// HTTP server is already running in a goroutine
|
||||
return nil
|
||||
} else {
|
||||
// Start stdio server (this will block)
|
||||
return server.ServeStdio(s.server)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the server (this will block)
|
||||
if err := s.server.Serve(); err != nil {
|
||||
s.logger.Error("Error starting MCP server", zap.Error(err))
|
||||
}
|
||||
func (s *K8sGptMCPServer) registerToolsAndResources() error {
|
||||
// Register analyze tool with proper JSON schema
|
||||
analyzeTool := mcp.NewTool("analyze",
|
||||
mcp.WithDescription("Analyze Kubernetes resources for issues and problems"),
|
||||
mcp.WithString("namespace",
|
||||
mcp.Description("Kubernetes namespace to analyze (empty for all namespaces)"),
|
||||
),
|
||||
mcp.WithString("backend",
|
||||
mcp.Description("AI backend to use for analysis (e.g., openai, azure, localai)"),
|
||||
),
|
||||
mcp.WithBoolean("explain",
|
||||
mcp.Description("Provide detailed explanations for issues"),
|
||||
),
|
||||
mcp.WithArray("filters",
|
||||
mcp.Description("Provide filters to narrow down the analysis (e.g. ['Pods', 'Deployments'])"),
|
||||
),
|
||||
)
|
||||
s.server.AddTool(analyzeTool, s.handleAnalyze)
|
||||
|
||||
// Register cluster info tool (no parameters needed)
|
||||
clusterInfoTool := mcp.NewTool("cluster-info",
|
||||
mcp.WithDescription("Get Kubernetes cluster information and version"),
|
||||
)
|
||||
s.server.AddTool(clusterInfoTool, s.handleClusterInfo)
|
||||
|
||||
// Register config tool with proper JSON schema
|
||||
configTool := mcp.NewTool("config",
|
||||
mcp.WithDescription("Configure K8sGPT settings including custom analyzers and cache"),
|
||||
mcp.WithObject("customAnalyzers",
|
||||
mcp.Description("Custom analyzer configurations"),
|
||||
mcp.Properties(map[string]any{
|
||||
"name": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Name of the custom analyzer",
|
||||
},
|
||||
"connection": map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{
|
||||
"url": map[string]any{
|
||||
"type": "string",
|
||||
"description": "URL of the custom analyzer service",
|
||||
},
|
||||
"port": map[string]any{
|
||||
"type": "integer",
|
||||
"description": "Port of the custom analyzer service",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
),
|
||||
mcp.WithObject("cache",
|
||||
mcp.Description("Cache configuration"),
|
||||
mcp.Properties(map[string]any{
|
||||
"type": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Cache type (s3, azure, gcs)",
|
||||
"enum": []string{"s3", "azure", "gcs"},
|
||||
},
|
||||
"bucketName": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Bucket name for S3/GCS cache",
|
||||
},
|
||||
"region": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Region for S3/GCS cache",
|
||||
},
|
||||
"endpoint": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Custom endpoint for S3 cache",
|
||||
},
|
||||
"insecure": map[string]any{
|
||||
"type": "boolean",
|
||||
"description": "Use insecure connection for cache",
|
||||
},
|
||||
"storageAccount": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Storage account for Azure cache",
|
||||
},
|
||||
"containerName": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Container name for Azure cache",
|
||||
},
|
||||
"projectId": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Project ID for GCS cache",
|
||||
},
|
||||
}),
|
||||
),
|
||||
)
|
||||
s.server.AddTool(configTool, s.handleConfig)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -116,6 +233,7 @@ type AnalyzeRequest struct {
|
||||
InteractiveMode bool `json:"interactiveMode,omitempty"`
|
||||
CustomHeaders []string `json:"customHeaders,omitempty"`
|
||||
WithStats bool `json:"withStats,omitempty"`
|
||||
Anonymize bool `json:"anonymize,omitempty"`
|
||||
}
|
||||
|
||||
// AnalyzeResponse represents the output of the analyze tool
|
||||
@@ -163,62 +281,74 @@ type ConfigResponse struct {
|
||||
}
|
||||
|
||||
// handleAnalyze handles the analyze tool
|
||||
func (s *MCPServer) handleAnalyze(ctx context.Context, request *AnalyzeRequest) (*mcp_golang.ToolResponse, error) {
|
||||
// Get stored configuration
|
||||
var configAI ai.AIConfiguration
|
||||
if err := viper.UnmarshalKey("ai", &configAI); err != nil {
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Failed to load AI configuration: %v", err))), nil
|
||||
func (s *K8sGptMCPServer) handleAnalyze(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
|
||||
|
||||
var req AnalyzeRequest
|
||||
if err := request.BindArguments(&req); err != nil {
|
||||
return mcp.NewToolResultErrorf("Failed to parse request arguments: %v", err), nil
|
||||
}
|
||||
// Use stored configuration if not specified in request
|
||||
if request.Backend == "" {
|
||||
if configAI.DefaultProvider != "" {
|
||||
request.Backend = configAI.DefaultProvider
|
||||
} else if len(configAI.Providers) > 0 {
|
||||
request.Backend = configAI.Providers[0].Name
|
||||
|
||||
if req.Backend == "" {
|
||||
if s.aiProvider.Name != "" {
|
||||
req.Backend = s.aiProvider.Name
|
||||
} else {
|
||||
request.Backend = "openai" // fallback default
|
||||
req.Backend = "openai" // fallback default
|
||||
}
|
||||
}
|
||||
|
||||
request.Explain = true
|
||||
// Get stored filters if not specified
|
||||
if len(request.Filters) == 0 {
|
||||
request.Filters = viper.GetStringSlice("active_filters")
|
||||
if len(req.Filters) == 0 {
|
||||
req.Filters = viper.GetStringSlice("active_filters")
|
||||
}
|
||||
|
||||
// Validate MaxConcurrency to prevent excessive memory allocation
|
||||
request.MaxConcurrency = validateMaxConcurrency(request.MaxConcurrency)
|
||||
req.MaxConcurrency = validateMaxConcurrency(req.MaxConcurrency)
|
||||
|
||||
// Create a new analysis with the request parameters
|
||||
analysis, err := analysis.NewAnalysis(
|
||||
request.Backend,
|
||||
request.Language,
|
||||
request.Filters,
|
||||
request.Namespace,
|
||||
request.LabelSelector,
|
||||
request.NoCache,
|
||||
request.Explain,
|
||||
request.MaxConcurrency,
|
||||
request.WithDoc,
|
||||
request.InteractiveMode,
|
||||
request.CustomHeaders,
|
||||
request.WithStats,
|
||||
req.Backend,
|
||||
req.Language,
|
||||
req.Filters,
|
||||
req.Namespace,
|
||||
req.LabelSelector,
|
||||
req.NoCache,
|
||||
req.Explain,
|
||||
req.MaxConcurrency,
|
||||
req.WithDoc,
|
||||
req.InteractiveMode,
|
||||
req.CustomHeaders,
|
||||
req.WithStats,
|
||||
)
|
||||
if err != nil {
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Failed to create analysis: %v", err))), nil
|
||||
return mcp.NewToolResultErrorf("Failed to create analysis: %v", err), nil
|
||||
}
|
||||
defer analysis.Close()
|
||||
|
||||
// Run the analysis
|
||||
analysis.RunAnalysis()
|
||||
if req.Explain {
|
||||
|
||||
// Get the output
|
||||
output, err := analysis.PrintOutput("json")
|
||||
if err != nil {
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Failed to print output: %v", err))), nil
|
||||
var output string
|
||||
err := analysis.GetAIResults(output, req.Anonymize)
|
||||
if err != nil {
|
||||
return mcp.NewToolResultErrorf("Failed to get results from AI: %v", err), nil
|
||||
}
|
||||
|
||||
// Convert results to JSON string using PrintOutput
|
||||
outputBytes, err := analysis.PrintOutput("text")
|
||||
if err != nil {
|
||||
return mcp.NewToolResultErrorf("Failed to convert results to string: %v", err), nil
|
||||
}
|
||||
plainText := stripANSI(string(outputBytes))
|
||||
return mcp.NewToolResultText(plainText), nil
|
||||
} else {
|
||||
// Get the output
|
||||
output, err := analysis.PrintOutput("json")
|
||||
if err != nil {
|
||||
return mcp.NewToolResultErrorf("Failed to print output: %v", err), nil
|
||||
}
|
||||
return mcp.NewToolResultText(string(output)), nil
|
||||
}
|
||||
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(string(output))), nil
|
||||
}
|
||||
|
||||
// validateMaxConcurrency validates and bounds the MaxConcurrency parameter
|
||||
@@ -233,25 +363,31 @@ func validateMaxConcurrency(maxConcurrency int) int {
|
||||
}
|
||||
|
||||
// handleClusterInfo handles the cluster-info tool
|
||||
func (s *MCPServer) handleClusterInfo(ctx context.Context, request *ClusterInfoRequest) (*mcp_golang.ToolResponse, error) {
|
||||
func (s *K8sGptMCPServer) handleClusterInfo(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
|
||||
// Create a new Kubernetes client
|
||||
client, err := kubernetes.NewClient("", "")
|
||||
if err != nil {
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("failed to create Kubernetes client: %v", err))), nil
|
||||
return mcp.NewToolResultErrorf("failed to create Kubernetes client: %v", err), nil
|
||||
}
|
||||
|
||||
// Get cluster info from the client
|
||||
version, err := client.Client.Discovery().ServerVersion()
|
||||
if err != nil {
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("failed to get cluster version: %v", err))), nil
|
||||
return mcp.NewToolResultErrorf("failed to get cluster version: %v", err), nil
|
||||
}
|
||||
|
||||
info := fmt.Sprintf("Kubernetes %s", version.GitVersion)
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(info)), nil
|
||||
return mcp.NewToolResultText(info), nil
|
||||
}
|
||||
|
||||
// handleConfig handles the config tool
|
||||
func (s *MCPServer) handleConfig(ctx context.Context, request *ConfigRequest) (*mcp_golang.ToolResponse, error) {
|
||||
func (s *K8sGptMCPServer) handleConfig(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
|
||||
// Parse request arguments
|
||||
var req ConfigRequest
|
||||
if err := request.BindArguments(&req); err != nil {
|
||||
return mcp.NewToolResultErrorf("Failed to parse request arguments: %v", err), nil
|
||||
}
|
||||
|
||||
// Create a new config handler
|
||||
handler := &config.Handler{}
|
||||
|
||||
@@ -261,8 +397,8 @@ func (s *MCPServer) handleConfig(ctx context.Context, request *ConfigRequest) (*
|
||||
}
|
||||
|
||||
// Add custom analyzers if present
|
||||
if len(request.CustomAnalyzers) > 0 {
|
||||
for _, ca := range request.CustomAnalyzers {
|
||||
if len(req.CustomAnalyzers) > 0 {
|
||||
for _, ca := range req.CustomAnalyzers {
|
||||
addConfigReq.CustomAnalyzers = append(addConfigReq.CustomAnalyzers, &schemav1.CustomAnalyzer{
|
||||
Name: ca.Name,
|
||||
Connection: &schemav1.Connection{
|
||||
@@ -274,31 +410,31 @@ func (s *MCPServer) handleConfig(ctx context.Context, request *ConfigRequest) (*
|
||||
}
|
||||
|
||||
// Add cache configuration if present
|
||||
if request.Cache.Type != "" {
|
||||
if req.Cache.Type != "" {
|
||||
cacheConfig := &schemav1.Cache{}
|
||||
switch request.Cache.Type {
|
||||
switch req.Cache.Type {
|
||||
case "s3":
|
||||
cacheConfig.CacheType = &schemav1.Cache_S3Cache{
|
||||
S3Cache: &schemav1.S3Cache{
|
||||
BucketName: request.Cache.BucketName,
|
||||
Region: request.Cache.Region,
|
||||
Endpoint: request.Cache.Endpoint,
|
||||
Insecure: request.Cache.Insecure,
|
||||
BucketName: req.Cache.BucketName,
|
||||
Region: req.Cache.Region,
|
||||
Endpoint: req.Cache.Endpoint,
|
||||
Insecure: req.Cache.Insecure,
|
||||
},
|
||||
}
|
||||
case "azure":
|
||||
cacheConfig.CacheType = &schemav1.Cache_AzureCache{
|
||||
AzureCache: &schemav1.AzureCache{
|
||||
StorageAccount: request.Cache.StorageAccount,
|
||||
ContainerName: request.Cache.ContainerName,
|
||||
StorageAccount: req.Cache.StorageAccount,
|
||||
ContainerName: req.Cache.ContainerName,
|
||||
},
|
||||
}
|
||||
case "gcs":
|
||||
cacheConfig.CacheType = &schemav1.Cache_GcsCache{
|
||||
GcsCache: &schemav1.GCSCache{
|
||||
BucketName: request.Cache.BucketName,
|
||||
Region: request.Cache.Region,
|
||||
ProjectId: request.Cache.ProjectId,
|
||||
BucketName: req.Cache.BucketName,
|
||||
Region: req.Cache.Region,
|
||||
ProjectId: req.Cache.ProjectId,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -307,27 +443,30 @@ func (s *MCPServer) handleConfig(ctx context.Context, request *ConfigRequest) (*
|
||||
|
||||
// Apply the configuration using the shared function
|
||||
if err := handler.ApplyConfig(ctx, addConfigReq); err != nil {
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Failed to add config: %v", err))), nil
|
||||
return mcp.NewToolResultErrorf("Failed to add config: %v", err), nil
|
||||
}
|
||||
|
||||
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent("Successfully added configuration")), nil
|
||||
return mcp.NewToolResultText("Successfully added configuration"), nil
|
||||
}
|
||||
|
||||
// registerPrompts registers the prompts for the MCP server
|
||||
func (s *MCPServer) registerPrompts() error {
|
||||
func (s *K8sGptMCPServer) registerPrompts() error {
|
||||
// Register any prompts needed for the MCP server
|
||||
return nil
|
||||
}
|
||||
|
||||
// registerResources registers the resources for the MCP server
|
||||
func (s *MCPServer) registerResources() error {
|
||||
if err := s.server.RegisterResource("cluster-info", "Get cluster information", "Get information about the Kubernetes cluster", "text", s.getClusterInfo); err != nil {
|
||||
return fmt.Errorf("failed to register cluster-info resource: %v", err)
|
||||
}
|
||||
func (s *K8sGptMCPServer) registerResources() error {
|
||||
clusterInfoResource := mcp.NewResource("cluster-info", "cluster-info",
|
||||
mcp.WithResourceDescription("Get information about the Kubernetes cluster"),
|
||||
mcp.WithMIMEType("application/json"),
|
||||
)
|
||||
|
||||
s.server.AddResource(clusterInfoResource, s.getClusterInfo)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MCPServer) getClusterInfo(ctx context.Context) (*mcp_golang.ResourceResponse, error) {
|
||||
func (s *K8sGptMCPServer) getClusterInfo(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
|
||||
// Create a new Kubernetes client
|
||||
client, err := kubernetes.NewClient("", "")
|
||||
if err != nil {
|
||||
@@ -346,24 +485,44 @@ func (s *MCPServer) getClusterInfo(ctx context.Context) (*mcp_golang.ResourceRes
|
||||
"gitVersion": version.GitVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return mcp_golang.NewResourceResponse(
|
||||
mcp_golang.NewTextEmbeddedResource(
|
||||
"cluster-info",
|
||||
"Failed to marshal cluster info",
|
||||
"text/plain",
|
||||
),
|
||||
), nil
|
||||
return []mcp.ResourceContents{
|
||||
&mcp.TextResourceContents{
|
||||
URI: "cluster-info",
|
||||
MIMEType: "text/plain",
|
||||
Text: "Failed to marshal cluster info",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
return mcp_golang.NewResourceResponse(
|
||||
mcp_golang.NewTextEmbeddedResource(
|
||||
"cluster-info",
|
||||
string(data),
|
||||
"application/json",
|
||||
),
|
||||
), nil
|
||||
|
||||
return []mcp.ResourceContents{
|
||||
&mcp.TextResourceContents{
|
||||
URI: "cluster-info",
|
||||
MIMEType: "application/json",
|
||||
Text: string(data),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the MCP server and releases resources
|
||||
func (s *MCPServer) Close() error {
|
||||
func (s *K8sGptMCPServer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// zapLoggerAdapter adapts zap.Logger to the interface expected by mark3labs/mcp-go
|
||||
type zapLoggerAdapter struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Infof(format string, v ...any) {
|
||||
z.logger.Info(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Errorf(format string, v ...any) {
|
||||
z.logger.Error(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
// stripANSI removes ANSI escape sequences from a string
|
||||
func stripANSI(input string) string {
|
||||
re := regexp.MustCompile(`\x1b\[[0-9;]*m`)
|
||||
return re.ReplaceAllString(input, "")
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
@@ -78,14 +79,14 @@ func TestMCPServerCreation(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test HTTP mode
|
||||
mcpServer, err := NewMCPServer("8089", aiProvider, true, logger)
|
||||
mcpServer, err := NewMCPServer("8088", aiProvider, true, logger)
|
||||
assert.NoError(t, err, "Should be able to create MCP server with HTTP transport")
|
||||
assert.NotNil(t, mcpServer, "MCP server should not be nil")
|
||||
assert.True(t, mcpServer.useHTTP, "MCP server should be in HTTP mode")
|
||||
assert.Equal(t, "8089", mcpServer.port, "Port should be set correctly")
|
||||
assert.Equal(t, "8088", mcpServer.port, "Port should be set correctly")
|
||||
|
||||
// Test stdio mode
|
||||
mcpServerStdio, err := NewMCPServer("8089", aiProvider, false, logger)
|
||||
mcpServerStdio, err := NewMCPServer("8088", aiProvider, false, logger)
|
||||
assert.NoError(t, err, "Should be able to create MCP server with stdio transport")
|
||||
assert.NotNil(t, mcpServerStdio, "MCP server should not be nil")
|
||||
assert.False(t, mcpServerStdio.useHTTP, "MCP server should be in stdio mode")
|
||||
@@ -107,27 +108,83 @@ func TestMCPServerBasicHTTP(t *testing.T) {
|
||||
Model: "test-model",
|
||||
}
|
||||
|
||||
mcpServer, err := NewMCPServer("8089", aiProvider, true, logger)
|
||||
mcpServer, err := NewMCPServer("8091", aiProvider, true, logger)
|
||||
assert.NoError(t, err, "Should be able to create MCP server")
|
||||
|
||||
// Start the MCP server in a goroutine
|
||||
go func() {
|
||||
err := mcpServer.Start()
|
||||
// Note: Start() might return an error when the server is stopped, which is expected
|
||||
if err != nil {
|
||||
logger.Info("MCP server stopped", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
// For HTTP mode, the server is already started in NewMCPServer
|
||||
// No need to call Start() as it's already running in a goroutine
|
||||
|
||||
// Wait for the server to start
|
||||
err = waitForPort("localhost:8089", 10*time.Second)
|
||||
err = waitForPort("localhost:8091", 10*time.Second)
|
||||
if err != nil {
|
||||
t.Skipf("MCP server did not start within timeout: %v", err)
|
||||
}
|
||||
|
||||
// Test basic connectivity to the MCP endpoint
|
||||
// The MCP HTTP transport uses a single POST endpoint for all requests
|
||||
resp, err := http.Post("http://localhost:8089/mcp", "application/json", bytes.NewBufferString(`{"jsonrpc":"2.0","id":1,"method":"tools/list"}`))
|
||||
// First, initialize the session
|
||||
initRequest := `{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "initialize",
|
||||
"params": {
|
||||
"protocolVersion": "2025-03-26",
|
||||
"capabilities": {
|
||||
"tools": {},
|
||||
"resources": {},
|
||||
"prompts": {}
|
||||
},
|
||||
"clientInfo": {
|
||||
"name": "test-client",
|
||||
"version": "1.0.0"
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
initResp, err := http.Post("http://localhost:8091/mcp", "application/json", bytes.NewBufferString(initRequest))
|
||||
if err != nil {
|
||||
t.Logf("Initialize request failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := initResp.Body.Close(); err != nil {
|
||||
t.Logf("Error closing init response body: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Read initialization response
|
||||
initBody, err := io.ReadAll(initResp.Body)
|
||||
if err != nil {
|
||||
t.Logf("Failed to read init response body: %v", err)
|
||||
} else {
|
||||
t.Logf("Init response status: %d, body: %s", initResp.StatusCode, string(initBody))
|
||||
}
|
||||
|
||||
// Extract session ID from response headers if present
|
||||
sessionID := initResp.Header.Get("Mcp-Session-Id")
|
||||
if sessionID == "" {
|
||||
t.Logf("No session ID in response headers")
|
||||
}
|
||||
|
||||
// Now test tools/list with session ID if available
|
||||
headers := map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json,text/event-stream",
|
||||
}
|
||||
if sessionID != "" {
|
||||
headers["Mcp-Session-Id"] = sessionID
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", "http://localhost:8091/mcp", bytes.NewBufferString(`{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}`))
|
||||
if err != nil {
|
||||
t.Logf("Failed to create request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for key, value := range headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
t.Logf("MCP endpoint test skipped (server might not be fully ready): %v", err)
|
||||
return
|
||||
@@ -139,6 +196,14 @@ func TestMCPServerBasicHTTP(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
// Read response body for debugging
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Logf("Failed to read response body: %v", err)
|
||||
} else {
|
||||
t.Logf("Response status: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// Accept both 200 and 404 as valid responses (404 means endpoint not implemented)
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
|
||||
t.Errorf("MCP endpoint returned unexpected status: %d", resp.StatusCode)
|
||||
@@ -168,13 +233,8 @@ func TestMCPServerToolCall(t *testing.T) {
|
||||
mcpServer, err := NewMCPServer("8090", aiProvider, true, logger)
|
||||
assert.NoError(t, err, "Should be able to create MCP server")
|
||||
|
||||
// Start the MCP server in a goroutine
|
||||
go func() {
|
||||
err := mcpServer.Start()
|
||||
if err != nil {
|
||||
logger.Info("MCP server stopped", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
// For HTTP mode, the server is already started in NewMCPServer
|
||||
// No need to call Start() as it's already running in a goroutine
|
||||
|
||||
// Wait for the server to start
|
||||
err = waitForPort("localhost:8090", 10*time.Second)
|
||||
@@ -182,6 +242,39 @@ func TestMCPServerToolCall(t *testing.T) {
|
||||
t.Skipf("MCP server did not start within timeout: %v", err)
|
||||
}
|
||||
|
||||
// First, initialize the session
|
||||
initRequest := `{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "initialize",
|
||||
"params": {
|
||||
"protocolVersion": "2025-03-26",
|
||||
"capabilities": {
|
||||
"tools": {},
|
||||
"resources": {},
|
||||
"prompts": {}
|
||||
},
|
||||
"clientInfo": {
|
||||
"name": "test-client",
|
||||
"version": "1.0.0"
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
initResp, err := http.Post("http://localhost:8090/mcp", "application/json", bytes.NewBufferString(initRequest))
|
||||
if err != nil {
|
||||
t.Logf("Initialize request failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := initResp.Body.Close(); err != nil {
|
||||
t.Logf("Error closing init response body: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Extract session ID from response headers if present
|
||||
sessionID := initResp.Header.Get("Mcp-Session-Id")
|
||||
|
||||
// Test calling the analyze tool with proper JSON-RPC format
|
||||
analyzeRequest := `{
|
||||
"jsonrpc": "2.0",
|
||||
@@ -199,7 +292,21 @@ func TestMCPServerToolCall(t *testing.T) {
|
||||
}
|
||||
}`
|
||||
|
||||
resp, err := http.Post("http://localhost:8090/mcp", "application/json", bytes.NewBufferString(analyzeRequest))
|
||||
// Create request with session ID if available
|
||||
req, err := http.NewRequest("POST", "http://localhost:8090/mcp", bytes.NewBufferString(analyzeRequest))
|
||||
if err != nil {
|
||||
t.Logf("Failed to create request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json,text/event-stream")
|
||||
if sessionID != "" {
|
||||
req.Header.Set("Mcp-Session-Id", sessionID)
|
||||
}
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
t.Logf("Analyze tool call test skipped (server might not be fully ready): %v", err)
|
||||
return
|
||||
|
Reference in New Issue
Block a user