From 2ccd716a6824dd34ef667895c2287142b23fbd00 Mon Sep 17 00:00:00 2001 From: Alon Girmonsky <1990761+alongir@users.noreply.github.com> Date: Fri, 6 Feb 2026 10:39:42 -0800 Subject: [PATCH] Add MCP registry metadata for official registry submission (#1835) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add MCP (Model Context Protocol) server command Implement `kubeshark mcp` command that runs an MCP server over stdio, enabling AI assistants to query Kubeshark's network visibility data. Features: - MCP protocol implementation (JSON-RPC 2.0 over stdio) - Dynamic tool discovery from Hub's /api/mcp endpoint - Local cluster management tools (check_kubeshark_status, start_kubeshark, stop_kubeshark) - --url flag for direct connection to existing Kubeshark deployment - --kubeconfig flag for proxy mode with kubectl - --allow-destructive flag to enable start/stop operations (safe by default) - --list-tools flag to display available tools - --mcp-config flag to generate MCP client configuration - 5-minute cache TTL for Hub tools/prompts - Prompts for common analysis tasks * Address code review comments for MCP implementation - Add 30s timeout to HTTP client to prevent hanging requests - Add scanner.Err() check after stdin processing loop - Close HTTP response bodies to prevent resource leaks - Add goroutine to wait on started process to prevent zombies - Simplify polling loop by removing ineffective context check - Advertise check_kubeshark_status in URL mode (was callable but hidden) - Update documentation to clarify URL mode only disables start/stop * Fix lint errors in mcpRunner.go - Use type conversion instead of struct literals for hubMCPTool -> mcpTool and hubMCPPromptArg -> mcpPromptArg (S1016 gosimple) - Lowercase error string to follow Go conventions (ST1005 staticcheck) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 * Add MCP server unit tests Comprehensive unit tests for the MCP server implementation: - Protocol tests (initialize, tools/list, tools/call, prompts/list, prompts/get) - Tool tests (check_kubeshark_status, start_kubeshark, stop_kubeshark) - Hub integration tests (tool fetching, caching, prompt handling) - Error handling tests - Edge case tests * Fix MCP unit tests to use correct /tools/call endpoint - Update all Hub tool tests to use POST /tools/call endpoint instead of individual paths like /workloads, /calls, /stats - Verify arguments in POST body instead of URL query parameters - Add newMockHubHandler helper for proper Hub endpoint mocking - Split TestMCP_ToolsList into three tests: - TestMCP_ToolsList_CLIOnly: Tests without Hub backend - TestMCP_ToolsList_WithDestructive: Tests with destructive flag - TestMCP_ToolsList_WithHubBackend: Tests with mock Hub providing tools - Fix TestMCP_FullConversation to mock Hub MCP endpoint correctly - Rename URL encoding tests for clarity - All tests now correctly reflect the implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 * Simplify MCP unit tests - Remove section header comments (10 headers) - Consolidate similar tests using table-driven patterns - Simplify test assertions with more concise checks - Combine edge case tests into single test function - Reduce verbose test structures Total reduction: 1477 → 495 lines (66%) All 24 tests still pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 * Add MCP integration test framework Add integration tests that run against a real Kubernetes cluster: - MCP protocol tests (initialize, tools/list, prompts/list) - Cluster management tests (check_kubeshark_status, start_kubeshark, stop_kubeshark) - Full lifecycle test (check -> start -> check -> stop -> check) - API tools tests (list_workloads, list_api_calls, get_api_stats) Also includes: - Makefile targets for running integration tests - Test helper functions (startMCPSession, cleanupKubeshark, etc.) - Documentation (README.md, TEMPLATE.md, ISSUE_TEMPLATE.md) * Address review comments on integration tests Makefile: - Use unique temporary files (mktemp) instead of shared /tmp/integration-test.log to prevent race conditions when multiple test targets run concurrently - Remove redundant test-integration-verbose target (test-integration already uses -v) - Add cleanup (rm -f) for temporary log files integration/mcp_test.go: - Capture stderr from MCP server for debugging failures - Add getStderr() method to mcpSession for accessing captured stderr - Fix potential goroutine leak by adding return statements after t.Fatalf - Remove t.Run subtests in TestMCP_APIToolsRequireKubeshark to clarify sequential execution with shared session - Fix benchmark to use getKubesharkBinary helper for consistency - Add Kubernetes cluster check to benchmark (graceful skip) - Add proper error handling for pipe creation in benchmark - Remove unnecessary bytes import workaround (now actually used for stderr) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 * Simplify and clean up MCP integration tests - Remove unrelated L4 viewer files (1239 lines) - Remove template/issue documentation files (419 lines) - Trim README to essential content only - Remove TEMPLATE comments from common_test.go - Add initialize() helper to reduce test boilerplate - Add hasKubernetesCluster() helper for benchmarks - Simplify all test functions with consistent patterns Total reduction: 2964 → 866 lines (71%) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 * Add MCP registry metadata for official registry submission Add metadata files for submitting Kubeshark MCP server to the official MCP registry at registry.modelcontextprotocol.io: - mcp/server.json: Registry metadata with tools, prompts, and configuration - mcp/README.md: MCP server documentation and usage guide --------- Co-authored-by: Claude Opus 4.5 --- Makefile | 63 +++++ cmd/mcp_test.go | 495 ++++++++++++++++++++++++++++++++++ integration/README.md | 57 ++++ integration/common_test.go | 217 +++++++++++++++ integration/mcp_test.go | 529 +++++++++++++++++++++++++++++++++++++ mcp/README.md | 185 +++++++++++++ mcp/server.json | 205 ++++++++++++++ 7 files changed, 1751 insertions(+) create mode 100644 cmd/mcp_test.go create mode 100644 integration/README.md create mode 100644 integration/common_test.go create mode 100644 integration/mcp_test.go create mode 100644 mcp/README.md create mode 100644 mcp/server.json diff --git a/Makefile b/Makefile index 010e590a7..c668d9752 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,69 @@ clean: ## Clean all build artifacts. test: ## Run cli tests. @go test ./... -coverpkg=./... -race -coverprofile=coverage.out -covermode=atomic +test-integration: ## Run integration tests (requires Kubernetes cluster). + @echo "Running integration tests..." + @LOG_FILE=$$(mktemp /tmp/integration-test.XXXXXX.log); \ + go test -tags=integration -timeout $${INTEGRATION_TIMEOUT:-5m} -v ./integration/... 2>&1 | tee $$LOG_FILE; \ + status=$$?; \ + echo ""; \ + echo "========================================"; \ + echo " INTEGRATION TEST SUMMARY"; \ + echo "========================================"; \ + grep -E "^(--- PASS|--- FAIL|--- SKIP)" $$LOG_FILE || true; \ + echo "----------------------------------------"; \ + pass=$$(grep -c "^--- PASS" $$LOG_FILE 2>/dev/null || true); \ + fail=$$(grep -c "^--- FAIL" $$LOG_FILE 2>/dev/null || true); \ + skip=$$(grep -c "^--- SKIP" $$LOG_FILE 2>/dev/null || true); \ + echo "PASSED: $${pass:-0}"; \ + echo "FAILED: $${fail:-0}"; \ + echo "SKIPPED: $${skip:-0}"; \ + echo "========================================"; \ + rm -f $$LOG_FILE; \ + exit $$status + +test-integration-mcp: ## Run only MCP integration tests. + @echo "Running MCP integration tests..." + @LOG_FILE=$$(mktemp /tmp/integration-test.XXXXXX.log); \ + go test -tags=integration -timeout $${INTEGRATION_TIMEOUT:-5m} -v ./integration/ -run "MCP" 2>&1 | tee $$LOG_FILE; \ + status=$$?; \ + echo ""; \ + echo "========================================"; \ + echo " INTEGRATION TEST SUMMARY"; \ + echo "========================================"; \ + grep -E "^(--- PASS|--- FAIL|--- SKIP)" $$LOG_FILE || true; \ + echo "----------------------------------------"; \ + pass=$$(grep -c "^--- PASS" $$LOG_FILE 2>/dev/null || true); \ + fail=$$(grep -c "^--- FAIL" $$LOG_FILE 2>/dev/null || true); \ + skip=$$(grep -c "^--- SKIP" $$LOG_FILE 2>/dev/null || true); \ + echo "PASSED: $${pass:-0}"; \ + echo "FAILED: $${fail:-0}"; \ + echo "SKIPPED: $${skip:-0}"; \ + echo "========================================"; \ + rm -f $$LOG_FILE; \ + exit $$status + +test-integration-short: ## Run quick integration tests (skips long-running tests). + @echo "Running quick integration tests..." + @LOG_FILE=$$(mktemp /tmp/integration-test.XXXXXX.log); \ + go test -tags=integration -timeout $${INTEGRATION_TIMEOUT:-2m} -short -v ./integration/... 2>&1 | tee $$LOG_FILE; \ + status=$$?; \ + echo ""; \ + echo "========================================"; \ + echo " INTEGRATION TEST SUMMARY"; \ + echo "========================================"; \ + grep -E "^(--- PASS|--- FAIL|--- SKIP)" $$LOG_FILE || true; \ + echo "----------------------------------------"; \ + pass=$$(grep -c "^--- PASS" $$LOG_FILE 2>/dev/null || true); \ + fail=$$(grep -c "^--- FAIL" $$LOG_FILE 2>/dev/null || true); \ + skip=$$(grep -c "^--- SKIP" $$LOG_FILE 2>/dev/null || true); \ + echo "PASSED: $${pass:-0}"; \ + echo "FAILED: $${fail:-0}"; \ + echo "SKIPPED: $${skip:-0}"; \ + echo "========================================"; \ + rm -f $$LOG_FILE; \ + exit $$status + lint: ## Lint the source code. golangci-lint run diff --git a/cmd/mcp_test.go b/cmd/mcp_test.go new file mode 100644 index 000000000..be6600e52 --- /dev/null +++ b/cmd/mcp_test.go @@ -0,0 +1,495 @@ +package cmd + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func newTestMCPServer() *mcpServer { + return &mcpServer{httpClient: &http.Client{}, stdin: &bytes.Buffer{}, stdout: &bytes.Buffer{}} +} + +func sendRequest(s *mcpServer, method string, id any, params any) string { + req := jsonRPCRequest{ + JSONRPC: "2.0", + ID: id, + Method: method, + } + if params != nil { + paramsBytes, _ := json.Marshal(params) + req.Params = paramsBytes + } + + s.handleRequest(&req) + + output := s.stdout.(*bytes.Buffer).String() + s.stdout.(*bytes.Buffer).Reset() + return output +} + +func parseResponse(t *testing.T, output string) jsonRPCResponse { + var resp jsonRPCResponse + if err := json.Unmarshal([]byte(strings.TrimSpace(output)), &resp); err != nil { + t.Fatalf("Failed to parse response: %v\nOutput: %s", err, output) + } + return resp +} + +func TestMCP_Initialize(t *testing.T) { + s := newTestMCPServer() + resp := parseResponse(t, sendRequest(s, "initialize", 1, nil)) + + if resp.ID != float64(1) || resp.Error != nil { + t.Fatalf("Expected ID 1 with no error, got ID=%v, error=%v", resp.ID, resp.Error) + } + + result := resp.Result.(map[string]any) + if result["protocolVersion"] != "2024-11-05" { + t.Errorf("Expected protocolVersion 2024-11-05, got %v", result["protocolVersion"]) + } + if result["serverInfo"].(map[string]any)["name"] != "kubeshark-mcp" { + t.Error("Expected server name kubeshark-mcp") + } + if !strings.Contains(result["instructions"].(string), "check_kubeshark_status") { + t.Error("Instructions should mention check_kubeshark_status") + } + if _, ok := result["capabilities"].(map[string]any)["prompts"]; !ok { + t.Error("Expected prompts capability") + } +} + +func TestMCP_Ping(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "ping", 42, nil)) + if resp.ID != float64(42) || resp.Error != nil || len(resp.Result.(map[string]any)) != 0 { + t.Errorf("Expected ID 42, no error, empty result") + } +} + +func TestMCP_InitializedNotification(t *testing.T) { + s := newTestMCPServer() + for _, method := range []string{"initialized", "notifications/initialized"} { + if output := sendRequest(s, method, nil, nil); output != "" { + t.Errorf("Expected no output for %s, got: %s", method, output) + } + } +} + +func TestMCP_UnknownMethod(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "unknown/method", 1, nil)) + if resp.Error == nil || resp.Error.Code != -32601 { + t.Fatalf("Expected error code -32601, got %v", resp.Error) + } +} + +func TestMCP_PromptsList(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "prompts/list", 1, nil)) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + prompts := resp.Result.(map[string]any)["prompts"].([]any) + if len(prompts) != 1 || prompts[0].(map[string]any)["name"] != "kubeshark_usage" { + t.Error("Expected 1 prompt named 'kubeshark_usage'") + } +} + +func TestMCP_PromptsGet(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "prompts/get", 1, map[string]any{"name": "kubeshark_usage"})) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + messages := resp.Result.(map[string]any)["messages"].([]any) + if len(messages) == 0 { + t.Fatal("Expected at least one message") + } + text := messages[0].(map[string]any)["content"].(map[string]any)["text"].(string) + for _, phrase := range []string{"check_kubeshark_status", "start_kubeshark", "stop_kubeshark"} { + if !strings.Contains(text, phrase) { + t.Errorf("Prompt should contain '%s'", phrase) + } + } +} + +func TestMCP_PromptsGet_UnknownPrompt(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "prompts/get", 1, map[string]any{"name": "unknown"})) + if resp.Error == nil || resp.Error.Code != -32602 { + t.Fatalf("Expected error code -32602, got %v", resp.Error) + } +} + +func TestMCP_ToolsList_CLIOnly(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "tools/list", 1, nil)) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + tools := resp.Result.(map[string]any)["tools"].([]any) + if len(tools) != 1 || tools[0].(map[string]any)["name"] != "check_kubeshark_status" { + t.Error("Expected only check_kubeshark_status tool") + } +} + +func TestMCP_ToolsList_WithDestructive(t *testing.T) { + s := &mcpServer{httpClient: &http.Client{}, stdin: &bytes.Buffer{}, stdout: &bytes.Buffer{}, allowDestructive: true} + resp := parseResponse(t, sendRequest(s, "tools/list", 1, nil)) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + tools := resp.Result.(map[string]any)["tools"].([]any) + toolNames := make(map[string]bool) + for _, tool := range tools { + toolNames[tool.(map[string]any)["name"].(string)] = true + } + for _, expected := range []string{"check_kubeshark_status", "start_kubeshark", "stop_kubeshark"} { + if !toolNames[expected] { + t.Errorf("Missing expected tool: %s", expected) + } + } +} + +func TestMCP_ToolsList_WithHubBackend(t *testing.T) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" || r.URL.Path == "" { + _, _ = w.Write([]byte(`{"name":"hub","tools":[{"name":"list_workloads","description":"","inputSchema":{}},{"name":"list_api_calls","description":"","inputSchema":{}}]}`)) + } + })) + defer mockServer.Close() + + s := &mcpServer{httpClient: &http.Client{}, stdin: &bytes.Buffer{}, stdout: &bytes.Buffer{}, hubBaseURL: mockServer.URL, backendInitialized: true, allowDestructive: true} + resp := parseResponse(t, sendRequest(s, "tools/list", 1, nil)) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + tools := resp.Result.(map[string]any)["tools"].([]any) + // Should have CLI tools (3) + Hub tools (2) = 5 tools + if len(tools) < 5 { + t.Errorf("Expected at least 5 tools, got %d", len(tools)) + } +} + +func TestMCP_ToolsCallUnknownTool(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + }) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "unknown"})) + if !resp.Result.(map[string]any)["isError"].(bool) { + t.Error("Expected isError=true for unknown tool") + } +} + +func TestMCP_ToolsCallInvalidParams(t *testing.T) { + s := newTestMCPServer() + req := jsonRPCRequest{JSONRPC: "2.0", ID: 1, Method: "tools/call", Params: json.RawMessage(`"invalid"`)} + s.handleRequest(&req) + resp := parseResponse(t, s.stdout.(*bytes.Buffer).String()) + if resp.Error == nil || resp.Error.Code != -32602 { + t.Fatalf("Expected error code -32602") + } +} + +func TestMCP_CheckKubesharkStatus(t *testing.T) { + for _, tc := range []struct { + name string + args map[string]any + }{ + {"no_config", map[string]any{}}, + {"with_namespace", map[string]any{"release_namespace": "custom-ns"}}, + } { + t.Run(tc.name, func(t *testing.T) { + resp := parseResponse(t, sendRequest(newTestMCPServer(), "tools/call", 1, mcpCallToolParams{Name: "check_kubeshark_status", Arguments: tc.args})) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + content := resp.Result.(map[string]any)["content"].([]any) + if len(content) == 0 || content[0].(map[string]any)["text"].(string) == "" { + t.Error("Expected non-empty response") + } + }) + } +} + +func newTestMCPServerWithMockBackend(handler http.HandlerFunc) (*mcpServer, *httptest.Server) { + mockServer := httptest.NewServer(handler) + return &mcpServer{httpClient: &http.Client{}, stdin: &bytes.Buffer{}, stdout: &bytes.Buffer{}, hubBaseURL: mockServer.URL, backendInitialized: true}, mockServer +} + +type hubToolCallRequest struct { + Tool string `json:"tool"` + Arguments map[string]any `json:"arguments"` +} + +func newMockHubHandler(t *testing.T, handler func(req hubToolCallRequest) (string, int)) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/tools/call" || r.Method != http.MethodPost { + w.WriteHeader(http.StatusNotFound) + return + } + var req hubToolCallRequest + _ = json.NewDecoder(r.Body).Decode(&req) + resp, status := handler(req) + w.WriteHeader(status) + _, _ = w.Write([]byte(resp)) + } +} + +func TestMCP_ListWorkloads(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(newMockHubHandler(t, func(req hubToolCallRequest) (string, int) { + if req.Tool != "list_workloads" { + t.Errorf("Expected tool 'list_workloads', got %s", req.Tool) + } + return `{"workloads": [{"name": "test-pod"}]}`, http.StatusOK + })) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_workloads", Arguments: map[string]any{"type": "pod"}})) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + text := resp.Result.(map[string]any)["content"].([]any)[0].(map[string]any)["text"].(string) + if !strings.Contains(text, "test-pod") { + t.Errorf("Expected 'test-pod' in response") + } +} + +func TestMCP_ListAPICalls(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(newMockHubHandler(t, func(req hubToolCallRequest) (string, int) { + if req.Tool != "list_api_calls" { + t.Errorf("Expected tool 'list_api_calls', got %s", req.Tool) + } + return `{"calls": [{"id": "123", "path": "/api/users"}]}`, http.StatusOK + })) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_api_calls", Arguments: map[string]any{"proto": "http"}})) + if resp.Error != nil { + t.Fatalf("Unexpected error: %v", resp.Error) + } + if !strings.Contains(resp.Result.(map[string]any)["content"].([]any)[0].(map[string]any)["text"].(string), "/api/users") { + t.Error("Expected '/api/users' in response") + } +} + +func TestMCP_GetAPICall(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(newMockHubHandler(t, func(req hubToolCallRequest) (string, int) { + if req.Tool != "get_api_call" || req.Arguments["id"] != "abc123" { + t.Errorf("Expected get_api_call with id=abc123") + } + return `{"id": "abc123", "path": "/api/orders"}`, http.StatusOK + })) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "get_api_call", Arguments: map[string]any{"id": "abc123"}})) + if resp.Error != nil || !strings.Contains(resp.Result.(map[string]any)["content"].([]any)[0].(map[string]any)["text"].(string), "abc123") { + t.Error("Expected response containing 'abc123'") + } +} + +func TestMCP_GetAPICall_MissingID(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(newMockHubHandler(t, func(req hubToolCallRequest) (string, int) { + return `{"error": "id is required"}`, http.StatusBadRequest + })) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "get_api_call", Arguments: map[string]any{}})) + if !resp.Result.(map[string]any)["isError"].(bool) { + t.Error("Expected isError=true") + } +} + +func TestMCP_GetAPIStats(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(newMockHubHandler(t, func(req hubToolCallRequest) (string, int) { + if req.Tool != "get_api_stats" { + t.Errorf("Expected get_api_stats") + } + return `{"stats": {"total_calls": 1000}}`, http.StatusOK + })) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "get_api_stats", Arguments: map[string]any{"ns": "prod"}})) + if resp.Error != nil || !strings.Contains(resp.Result.(map[string]any)["content"].([]any)[0].(map[string]any)["text"].(string), "total_calls") { + t.Error("Expected 'total_calls' in response") + } +} + +func TestMCP_APITools_BackendError(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_workloads"})) + if !resp.Result.(map[string]any)["isError"].(bool) { + t.Error("Expected isError=true for backend error") + } +} + +func TestMCP_APITools_BackendConnectionError(t *testing.T) { + s := &mcpServer{httpClient: &http.Client{}, stdin: &bytes.Buffer{}, stdout: &bytes.Buffer{}, hubBaseURL: "http://localhost:99999", backendInitialized: true} + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_workloads"})) + if !resp.Result.(map[string]any)["isError"].(bool) { + t.Error("Expected isError=true for connection error") + } +} + +func TestMCP_RunLoop_ParseError(t *testing.T) { + output := &bytes.Buffer{} + s := &mcpServer{httpClient: &http.Client{}, stdin: strings.NewReader("invalid\n"), stdout: output} + s.run() + if resp := parseResponse(t, output.String()); resp.Error == nil || resp.Error.Code != -32700 { + t.Fatalf("Expected error code -32700") + } +} + +func TestMCP_RunLoop_MultipleRequests(t *testing.T) { + output := &bytes.Buffer{} + s := &mcpServer{httpClient: &http.Client{}, stdin: strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"} +{"jsonrpc":"2.0","id":2,"method":"ping"} +`), stdout: output} + s.run() + if lines := strings.Split(strings.TrimSpace(output.String()), "\n"); len(lines) != 2 { + t.Fatalf("Expected 2 responses, got %d", len(lines)) + } +} + +func TestMCP_RunLoop_EmptyLines(t *testing.T) { + output := &bytes.Buffer{} + s := &mcpServer{httpClient: &http.Client{}, stdin: strings.NewReader("\n\n{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"ping\"}\n"), stdout: output} + s.run() + if lines := strings.Split(strings.TrimSpace(output.String()), "\n"); len(lines) != 1 { + t.Fatalf("Expected 1 response, got %d", len(lines)) + } +} + +func TestMCP_ResponseFormat(t *testing.T) { + s := newTestMCPServer() + // Numeric ID + if resp := parseResponse(t, sendRequest(s, "ping", 123, nil)); resp.ID != float64(123) || resp.JSONRPC != "2.0" { + t.Errorf("Expected ID 123 and jsonrpc 2.0") + } + // String ID + if resp := parseResponse(t, sendRequest(s, "ping", "str", nil)); resp.ID != "str" { + t.Errorf("Expected ID 'str'") + } +} + +func TestMCP_ToolCallResult_ContentFormat(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data": "test"}`)) + }) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_workloads"})) + content := resp.Result.(map[string]any)["content"].([]any) + if len(content) == 0 || content[0].(map[string]any)["type"] != "text" { + t.Error("Expected content with type=text") + } +} + +func TestMCP_CommandArgs(t *testing.T) { + // Test start command args building + for _, tc := range []struct { + args map[string]any + expected string + }{ + {map[string]any{}, "tap --set headless=true"}, + {map[string]any{"pod_regex": "nginx.*"}, "tap nginx.* --set headless=true"}, + {map[string]any{"namespaces": "default"}, "tap -n default --set headless=true"}, + {map[string]any{"release_namespace": "ks"}, "tap -s ks --set headless=true"}, + } { + cmdArgs := []string{"tap"} + if v, _ := tc.args["pod_regex"].(string); v != "" { + cmdArgs = append(cmdArgs, v) + } + if v, _ := tc.args["namespaces"].(string); v != "" { + for _, ns := range strings.Split(v, ",") { + cmdArgs = append(cmdArgs, "-n", strings.TrimSpace(ns)) + } + } + if v, _ := tc.args["release_namespace"].(string); v != "" { + cmdArgs = append(cmdArgs, "-s", v) + } + cmdArgs = append(cmdArgs, "--set", "headless=true") + if got := strings.Join(cmdArgs, " "); got != tc.expected { + t.Errorf("Expected %q, got %q", tc.expected, got) + } + } +} + +func TestMCP_PrettyPrintJSON(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"key":"value"}`)) + }) + defer mockServer.Close() + + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_workloads"})) + text := resp.Result.(map[string]any)["content"].([]any)[0].(map[string]any)["text"].(string) + if !strings.Contains(text, "\n") { + t.Error("Expected pretty-printed JSON") + } +} + +func TestMCP_SpecialCharsAndEdgeCases(t *testing.T) { + s, mockServer := newTestMCPServerWithMockBackend(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{}`)) + }) + defer mockServer.Close() + + // Test special chars, empty args, nil args + for _, args := range []map[string]any{ + {"path": "/api?id=123"}, + {"id": "abc/123"}, + {}, + nil, + } { + resp := parseResponse(t, sendRequest(s, "tools/call", 1, mcpCallToolParams{Name: "list_workloads", Arguments: args})) + if resp.Error != nil { + t.Errorf("Unexpected error with args %v: %v", args, resp.Error) + } + } +} + +func TestMCP_BackendInitialization_Concurrent(t *testing.T) { + s := newTestMCPServer() + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { s.ensureBackendConnection(); done <- true }() + } + for i := 0; i < 10; i++ { + <-done + } +} + +func TestMCP_FullConversation(t *testing.T) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + _, _ = w.Write([]byte(`{"name":"hub","tools":[{"name":"list_workloads","description":"","inputSchema":{}}]}`)) + } else if r.URL.Path == "/tools/call" { + _, _ = w.Write([]byte(`{"data":"ok"}`)) + } + })) + defer mockServer.Close() + + input := `{"jsonrpc":"2.0","id":1,"method":"initialize"} +{"jsonrpc":"2.0","method":"notifications/initialized"} +{"jsonrpc":"2.0","id":2,"method":"tools/list"} +{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"list_workloads","arguments":{}}} +` + output := &bytes.Buffer{} + s := &mcpServer{httpClient: &http.Client{}, stdin: strings.NewReader(input), stdout: output, hubBaseURL: mockServer.URL, backendInitialized: true} + s.run() + + lines := strings.Split(strings.TrimSpace(output.String()), "\n") + if len(lines) != 3 { // 3 responses (notification has no response) + t.Errorf("Expected 3 responses, got %d", len(lines)) + } + for i, line := range lines { + var resp jsonRPCResponse + if err := json.Unmarshal([]byte(line), &resp); err != nil || resp.Error != nil { + t.Errorf("Response %d: parse error or unexpected error", i) + } + } +} diff --git a/integration/README.md b/integration/README.md new file mode 100644 index 000000000..aa1466fb6 --- /dev/null +++ b/integration/README.md @@ -0,0 +1,57 @@ +# Integration Tests + +This directory contains integration tests that run against a real Kubernetes cluster. + +## Prerequisites + +1. **Kubernetes cluster** - A running cluster accessible via `kubectl` +2. **kubectl** - Configured with appropriate context +3. **Go 1.21+** - For running tests + +## Running Tests + +```bash +# Run all integration tests +make test-integration + +# Run specific command tests +make test-integration-mcp + +# Run with verbose output +make test-integration-verbose + +# Run with custom timeout (default: 5m) +INTEGRATION_TIMEOUT=10m make test-integration +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `KUBESHARK_BINARY` | Auto-built | Path to pre-built kubeshark binary | +| `INTEGRATION_TIMEOUT` | `5m` | Test timeout duration | +| `KUBECONFIG` | `~/.kube/config` | Kubernetes config file | +| `INTEGRATION_SKIP_CLEANUP` | `false` | Skip cleanup after tests (for debugging) | + +## Test Structure + +``` +integration/ +├── README.md # This file +├── common_test.go # Shared test helpers +├── mcp_test.go # MCP command integration tests +├── tap_test.go # Tap command tests (future) +└── ... # Additional command tests +``` + +## Writing New Tests + +1. Create `_test.go` with build tag `//go:build integration` +2. Use helpers from `common_test.go`: `requireKubernetesCluster(t)`, `getKubesharkBinary(t)`, `cleanupKubeshark(t, binary)` + +## CI/CD Integration + +```bash +# JSON output for CI parsing +go test -tags=integration -json ./integration/... +``` diff --git a/integration/common_test.go b/integration/common_test.go new file mode 100644 index 000000000..4d658d82b --- /dev/null +++ b/integration/common_test.go @@ -0,0 +1,217 @@ +//go:build integration + +// Package integration contains integration tests that run against a real Kubernetes cluster. +// Run with: go test -tags=integration ./integration/... +package integration + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + "time" +) + +const ( + binaryName = "kubeshark" + defaultTimeout = 2 * time.Minute + startupTimeout = 3 * time.Minute +) + +var ( + // binaryPath caches the built binary path + binaryPath string + buildOnce sync.Once + buildErr error +) + +// requireKubernetesCluster skips the test if no Kubernetes cluster is available. +func requireKubernetesCluster(t *testing.T) { + t.Helper() + if !hasKubernetesCluster() { + t.Skip("Skipping: no Kubernetes cluster available") + } +} + +// hasKubernetesCluster returns true if a Kubernetes cluster is accessible. +func hasKubernetesCluster() bool { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + return exec.CommandContext(ctx, "kubectl", "cluster-info").Run() == nil +} + +// getKubesharkBinary returns the path to the kubeshark binary, building it if necessary. +func getKubesharkBinary(t *testing.T) string { + t.Helper() + + // Check if binary path is provided via environment + if envBinary := os.Getenv("KUBESHARK_BINARY"); envBinary != "" { + if _, err := os.Stat(envBinary); err == nil { + return envBinary + } + t.Fatalf("KUBESHARK_BINARY set but file not found: %s", envBinary) + } + + // Build once per test run + buildOnce.Do(func() { + binaryPath, buildErr = buildBinary(t) + }) + + if buildErr != nil { + t.Fatalf("Failed to build binary: %v", buildErr) + } + + return binaryPath +} + +// buildBinary compiles the binary and returns its path. +func buildBinary(t *testing.T) (string, error) { + t.Helper() + + // Find project root (directory containing go.mod) + projectRoot, err := findProjectRoot() + if err != nil { + return "", fmt.Errorf("finding project root: %w", err) + } + + outputPath := filepath.Join(projectRoot, "bin", binaryName+"_integration_test") + + t.Logf("Building %s binary at %s", binaryName, outputPath) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + cmd := exec.CommandContext(ctx, "go", "build", + "-o", outputPath, + filepath.Join(projectRoot, binaryName+".go"), + ) + cmd.Dir = projectRoot + + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("build failed: %w\nOutput: %s", err, output) + } + + return outputPath, nil +} + +// findProjectRoot locates the project root by finding go.mod +func findProjectRoot() (string, error) { + dir, err := os.Getwd() + if err != nil { + return "", err + } + + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir, nil + } + + parent := filepath.Dir(dir) + if parent == dir { + return "", fmt.Errorf("could not find go.mod in any parent directory") + } + dir = parent + } +} + +// runKubeshark executes the kubeshark binary with the given arguments. +// Returns combined stdout/stderr and any error. +func runKubeshark(t *testing.T, binary string, args ...string) (string, error) { + t.Helper() + return runKubesharkWithTimeout(t, binary, defaultTimeout, args...) +} + +// runKubesharkWithTimeout executes the kubeshark binary with a custom timeout. +func runKubesharkWithTimeout(t *testing.T, binary string, timeout time.Duration, args ...string) (string, error) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + t.Logf("Running: %s %s", binary, strings.Join(args, " ")) + + cmd := exec.CommandContext(ctx, binary, args...) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + + output := stdout.String() + if stderr.Len() > 0 { + output += "\n[stderr]\n" + stderr.String() + } + + if ctx.Err() == context.DeadlineExceeded { + return output, fmt.Errorf("command timed out after %v", timeout) + } + + return output, err +} + +// cleanupKubeshark ensures Kubeshark is not running in the cluster. +func cleanupKubeshark(t *testing.T, binary string) { + t.Helper() + + if os.Getenv("INTEGRATION_SKIP_CLEANUP") == "true" { + t.Log("Skipping cleanup (INTEGRATION_SKIP_CLEANUP=true)") + return + } + + t.Log("Cleaning up any existing Kubeshark installation...") + + // Run clean command, ignore errors (might not be installed) + _, _ = runKubeshark(t, binary, "clean") + + // Wait a moment for resources to be deleted + time.Sleep(2 * time.Second) +} + +// waitForKubesharkReady waits for Kubeshark to be ready after starting. +func waitForKubesharkReady(t *testing.T, binary string, timeout time.Duration) error { + t.Helper() + + t.Log("Waiting for Kubeshark to be ready...") + + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + // Check if pods are running + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + cmd := exec.CommandContext(ctx, "kubectl", "get", "pods", "-l", "app.kubernetes.io/name=kubeshark", "-o", "jsonpath={.items[*].status.phase}") + output, err := cmd.Output() + cancel() + + if err == nil && strings.Contains(string(output), "Running") { + t.Log("Kubeshark is ready") + return nil + } + + time.Sleep(5 * time.Second) + } + + return fmt.Errorf("timeout waiting for Kubeshark to be ready") +} + +// isKubesharkRunning checks if Kubeshark is currently running in the cluster. +func isKubesharkRunning(t *testing.T) bool { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, "kubectl", "get", "pods", "-l", "app.kubernetes.io/name=kubeshark", "-o", "name") + output, err := cmd.Output() + if err != nil { + return false + } + + return strings.TrimSpace(string(output)) != "" +} diff --git a/integration/mcp_test.go b/integration/mcp_test.go new file mode 100644 index 000000000..63394aae9 --- /dev/null +++ b/integration/mcp_test.go @@ -0,0 +1,529 @@ +//go:build integration + +package integration + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "io" + "os/exec" + "strings" + "testing" + "time" +) + +// MCPRequest represents a JSON-RPC request +type MCPRequest struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` +} + +// MCPResponse represents a JSON-RPC response +type MCPResponse struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *MCPError `json:"error,omitempty"` +} + +// MCPError represents a JSON-RPC error +type MCPError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// mcpSession represents a running MCP server session +type mcpSession struct { + cmd *exec.Cmd + stdin io.WriteCloser + stdout *bufio.Reader + stderr *bytes.Buffer // Captured stderr for debugging + cancel context.CancelFunc +} + +// startMCPSession starts an MCP server and returns a session for sending requests. +// By default, starts in read-only mode (no --allow-destructive). +func startMCPSession(t *testing.T, binary string, args ...string) *mcpSession { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + + cmdArgs := append([]string{"mcp"}, args...) + cmd := exec.CommandContext(ctx, binary, cmdArgs...) + + stdin, err := cmd.StdinPipe() + if err != nil { + cancel() + t.Fatalf("Failed to create stdin pipe: %v", err) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + t.Fatalf("Failed to create stdout pipe: %v", err) + } + + // Capture stderr for debugging + var stderrBuf bytes.Buffer + cmd.Stderr = &stderrBuf + + if err := cmd.Start(); err != nil { + cancel() + t.Fatalf("Failed to start MCP server: %v", err) + } + + return &mcpSession{ + cmd: cmd, + stdin: stdin, + stdout: bufio.NewReader(stdout), + stderr: &stderrBuf, + cancel: cancel, + } +} + +// startMCPSessionWithDestructive starts an MCP server with --allow-destructive flag. +func startMCPSessionWithDestructive(t *testing.T, binary string, args ...string) *mcpSession { + t.Helper() + allArgs := append([]string{"--allow-destructive"}, args...) + return startMCPSession(t, binary, allArgs...) +} + +// sendRequest sends a JSON-RPC request and returns the response (30s timeout). +func (s *mcpSession) sendRequest(t *testing.T, req MCPRequest) MCPResponse { + t.Helper() + return s.sendRequestWithTimeout(t, req, 30*time.Second) +} + +// sendRequestWithTimeout sends a JSON-RPC request with a custom timeout. +func (s *mcpSession) sendRequestWithTimeout(t *testing.T, req MCPRequest, timeout time.Duration) MCPResponse { + t.Helper() + + reqBytes, err := json.Marshal(req) + if err != nil { + t.Fatalf("Failed to marshal request: %v", err) + } + + t.Logf("Sending: %s", string(reqBytes)) + + if _, err := s.stdin.Write(append(reqBytes, '\n')); err != nil { + t.Fatalf("Failed to write request: %v", err) + } + + // Read response with timeout + responseChan := make(chan string, 1) + errChan := make(chan error, 1) + + go func() { + line, err := s.stdout.ReadString('\n') + if err != nil { + errChan <- err + return + } + responseChan <- line + }() + + select { + case line := <-responseChan: + t.Logf("Received: %s", strings.TrimSpace(line)) + var resp MCPResponse + if err := json.Unmarshal([]byte(line), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v\nResponse: %s", err, line) + } + return resp + case err := <-errChan: + t.Fatalf("Failed to read response: %v", err) + return MCPResponse{} + case <-time.After(timeout): + t.Fatalf("Timeout waiting for MCP response after %v", timeout) + return MCPResponse{} + } +} + +// callTool invokes an MCP tool and returns the response (30s timeout). +func (s *mcpSession) callTool(t *testing.T, id int, toolName string, args map[string]interface{}) MCPResponse { + t.Helper() + return s.callToolWithTimeout(t, id, toolName, args, 30*time.Second) +} + +// callToolWithTimeout invokes an MCP tool with a custom timeout. +func (s *mcpSession) callToolWithTimeout(t *testing.T, id int, toolName string, args map[string]interface{}, timeout time.Duration) MCPResponse { + t.Helper() + + return s.sendRequestWithTimeout(t, MCPRequest{ + JSONRPC: "2.0", + ID: id, + Method: "tools/call", + Params: map[string]interface{}{ + "name": toolName, + "arguments": args, + }, + }, timeout) +} + +// close terminates the MCP session. +func (s *mcpSession) close() { + s.cancel() + _ = s.cmd.Wait() +} + +// getStderr returns any captured stderr output (useful for debugging failures). +func (s *mcpSession) getStderr() string { + if s.stderr == nil { + return "" + } + return s.stderr.String() +} + +// initialize sends the MCP initialize request and returns the response. +func (s *mcpSession) initialize(t *testing.T, id int) MCPResponse { + t.Helper() + return s.sendRequest(t, MCPRequest{ + JSONRPC: "2.0", + ID: id, + Method: "initialize", + Params: map[string]interface{}{ + "protocolVersion": "2024-11-05", + "capabilities": map[string]interface{}{}, + "clientInfo": map[string]interface{}{"name": "test", "version": "1.0"}, + }, + }) +} + +// TestMCP_Initialize tests the MCP initialization handshake. +func TestMCP_Initialize(t *testing.T) { + requireKubernetesCluster(t) + session := startMCPSession(t, getKubesharkBinary(t)) + defer session.close() + + resp := session.initialize(t, 1) + if resp.Error != nil { + t.Fatalf("Initialize failed: %s", resp.Error.Message) + } + + var result map[string]interface{} + if err := json.Unmarshal(resp.Result, &result); err != nil { + t.Fatalf("Failed to parse result: %v", err) + } + + if _, ok := result["capabilities"]; !ok { + t.Error("Response missing capabilities") + } + if _, ok := result["serverInfo"]; !ok { + t.Error("Response missing serverInfo") + } +} + +// TestMCP_ToolsList_ReadOnly tests that tools/list returns only safe tools in read-only mode. +func TestMCP_ToolsList_ReadOnly(t *testing.T) { + requireKubernetesCluster(t) + session := startMCPSession(t, getKubesharkBinary(t)) + defer session.close() + + session.initialize(t, 1) + resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"}) + if resp.Error != nil { + t.Fatalf("tools/list failed: %s", resp.Error.Message) + } + + var result struct { + Tools []struct{ Name string `json:"name"` } `json:"tools"` + } + if err := json.Unmarshal(resp.Result, &result); err != nil { + t.Fatalf("Failed to parse result: %v", err) + } + + toolNames := make(map[string]bool) + for _, tool := range result.Tools { + toolNames[tool.Name] = true + } + + if !toolNames["check_kubeshark_status"] { + t.Error("Missing expected tool: check_kubeshark_status") + } + if toolNames["start_kubeshark"] || toolNames["stop_kubeshark"] { + t.Error("Destructive tools should not be available in read-only mode") + } +} + +// TestMCP_ToolsList_WithDestructive tests that tools/list includes destructive tools when flag is set. +func TestMCP_ToolsList_WithDestructive(t *testing.T) { + requireKubernetesCluster(t) + session := startMCPSessionWithDestructive(t, getKubesharkBinary(t)) + defer session.close() + + session.initialize(t, 1) + resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"}) + if resp.Error != nil { + t.Fatalf("tools/list failed: %s", resp.Error.Message) + } + + var result struct { + Tools []struct{ Name string `json:"name"` } `json:"tools"` + } + if err := json.Unmarshal(resp.Result, &result); err != nil { + t.Fatalf("Failed to parse result: %v", err) + } + + toolNames := make(map[string]bool) + for _, tool := range result.Tools { + toolNames[tool.Name] = true + } + + for _, expected := range []string{"check_kubeshark_status", "start_kubeshark", "stop_kubeshark"} { + if !toolNames[expected] { + t.Errorf("Missing expected tool: %s", expected) + } + } +} + +// TestMCP_CheckKubesharkStatus_NotRunning tests check_kubeshark_status when Kubeshark is not running. +func TestMCP_CheckKubesharkStatus_NotRunning(t *testing.T) { + requireKubernetesCluster(t) + binary := getKubesharkBinary(t) + cleanupKubeshark(t, binary) + + session := startMCPSession(t, binary) + defer session.close() + + session.initialize(t, 1) + resp := session.callTool(t, 2, "check_kubeshark_status", nil) + if resp.Error != nil { + t.Fatalf("check_kubeshark_status failed: %s", resp.Error.Message) + } + + var result struct { + Content []struct{ Text string `json:"text"` } `json:"content"` + } + if err := json.Unmarshal(resp.Result, &result); err != nil { + t.Fatalf("Failed to parse result: %v", err) + } + + if len(result.Content) == 0 || (!strings.Contains(result.Content[0].Text, "not running") && !strings.Contains(result.Content[0].Text, "NOT")) { + t.Errorf("Expected 'not running' status") + } +} + +// TestMCP_StartKubeshark tests the start_kubeshark tool. +func TestMCP_StartKubeshark(t *testing.T) { + if testing.Short() { + t.Skip("Skipping in short mode") + } + requireKubernetesCluster(t) + binary := getKubesharkBinary(t) + cleanupKubeshark(t, binary) + t.Cleanup(func() { cleanupKubeshark(t, binary) }) + + session := startMCPSessionWithDestructive(t, binary) + defer session.close() + + session.initialize(t, 1) + resp := session.callToolWithTimeout(t, 2, "start_kubeshark", nil, 3*time.Minute) + if resp.Error != nil { + t.Fatalf("start_kubeshark failed: %s", resp.Error.Message) + } + + if !isKubesharkRunning(t) { + t.Error("Kubeshark should be running after start_kubeshark") + } +} + +// TestMCP_StartKubeshark_WithoutFlag tests that start_kubeshark fails without --allow-destructive. +func TestMCP_StartKubeshark_WithoutFlag(t *testing.T) { + requireKubernetesCluster(t) + session := startMCPSession(t, getKubesharkBinary(t)) + defer session.close() + + session.initialize(t, 1) + resp := session.callTool(t, 2, "start_kubeshark", nil) + + var result struct { + Content []struct{ Text string `json:"text"` } `json:"content"` + IsError bool `json:"isError"` + } + if err := json.Unmarshal(resp.Result, &result); err != nil { + t.Fatalf("Failed to parse result: %v", err) + } + + if !result.IsError { + t.Error("Expected isError=true without --allow-destructive") + } +} + +// TestMCP_StopKubeshark tests the stop_kubeshark tool. +func TestMCP_StopKubeshark(t *testing.T) { + if testing.Short() { + t.Skip("Skipping in short mode") + } + requireKubernetesCluster(t) + binary := getKubesharkBinary(t) + + session := startMCPSessionWithDestructive(t, binary) + defer session.close() + + session.initialize(t, 0) + + // Start Kubeshark if not running + if !isKubesharkRunning(t) { + resp := session.callToolWithTimeout(t, 1, "start_kubeshark", nil, 2*time.Minute) + if resp.Error != nil { + t.Skipf("Could not start Kubeshark: %v", resp.Error.Message) + } + } + + resp := session.callToolWithTimeout(t, 2, "stop_kubeshark", nil, 2*time.Minute) + if resp.Error != nil { + t.Fatalf("stop_kubeshark failed: %s", resp.Error.Message) + } + + time.Sleep(5 * time.Second) + if isKubesharkRunning(t) { + t.Error("Kubeshark should not be running after stop_kubeshark") + } +} + +// TestMCP_StopKubeshark_WithoutFlag tests that stop_kubeshark fails without --allow-destructive. +func TestMCP_StopKubeshark_WithoutFlag(t *testing.T) { + requireKubernetesCluster(t) + session := startMCPSession(t, getKubesharkBinary(t)) + defer session.close() + + session.initialize(t, 1) + resp := session.callTool(t, 2, "stop_kubeshark", nil) + + var result struct { + IsError bool `json:"isError"` + } + if err := json.Unmarshal(resp.Result, &result); err != nil { + t.Fatalf("Failed to parse result: %v", err) + } + + if !result.IsError { + t.Error("Expected isError=true without --allow-destructive") + } +} + +// TestMCP_FullLifecycle tests the complete lifecycle: check -> start -> check -> stop -> check +func TestMCP_FullLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("Skipping in short mode") + } + requireKubernetesCluster(t) + binary := getKubesharkBinary(t) + cleanupKubeshark(t, binary) + + session := startMCPSessionWithDestructive(t, binary) + defer session.close() + + session.initialize(t, 1) + + // Check -> Start -> Check -> Stop -> Check + if resp := session.callTool(t, 2, "check_kubeshark_status", nil); resp.Error != nil { + t.Fatalf("Initial status check failed: %s", resp.Error.Message) + } + + if resp := session.callToolWithTimeout(t, 3, "start_kubeshark", nil, 3*time.Minute); resp.Error != nil { + t.Fatalf("Start failed: %s", resp.Error.Message) + } + if err := waitForKubesharkReady(t, binary, startupTimeout); err != nil { + t.Fatalf("Kubeshark did not become ready: %v", err) + } + + if resp := session.callTool(t, 4, "check_kubeshark_status", nil); resp.Error != nil { + t.Fatalf("Status check after start failed: %s", resp.Error.Message) + } + + if resp := session.callToolWithTimeout(t, 5, "stop_kubeshark", nil, 2*time.Minute); resp.Error != nil { + t.Fatalf("Stop failed: %s", resp.Error.Message) + } + time.Sleep(5 * time.Second) + + if resp := session.callTool(t, 6, "check_kubeshark_status", nil); resp.Error != nil { + t.Fatalf("Final status check failed: %s", resp.Error.Message) + } +} + +// TestMCP_APIToolsRequireKubeshark tests that API tools return helpful errors when Kubeshark isn't running. +func TestMCP_APIToolsRequireKubeshark(t *testing.T) { + requireKubernetesCluster(t) + binary := getKubesharkBinary(t) + cleanupKubeshark(t, binary) + + session := startMCPSession(t, binary) + defer session.close() + + session.initialize(t, 1) + + for i, tool := range []string{"list_workloads", "list_api_calls", "get_api_stats"} { + resp := session.callTool(t, i+2, tool, nil) + // Either error or helpful message is acceptable + if resp.Error != nil { + t.Logf("%s returned error (expected): %s", tool, resp.Error.Message) + } + } +} + +// TestMCP_SetFlags tests that --set flags are passed correctly. +func TestMCP_SetFlags(t *testing.T) { + requireKubernetesCluster(t) + session := startMCPSession(t, getKubesharkBinary(t), "--set", "tap.namespaces={default}") + defer session.close() + + session.initialize(t, 1) + resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"}) + if resp.Error != nil { + t.Fatalf("tools/list failed with --set flags: %s", resp.Error.Message) + } +} + +// BenchmarkMCP_CheckStatus benchmarks the check_kubeshark_status tool. +func BenchmarkMCP_CheckStatus(b *testing.B) { + if testing.Short() { + b.Skip("Skipping benchmark in short mode") + } + if !hasKubernetesCluster() { + b.Skip("Skipping: no Kubernetes cluster available") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cmd := exec.CommandContext(ctx, getKubesharkBinary(b), "mcp") + stdin, _ := cmd.StdinPipe() + stdout, _ := cmd.StdoutPipe() + reader := bufio.NewReader(stdout) + + if err := cmd.Start(); err != nil { + b.Fatalf("Failed to start MCP: %v", err) + } + defer func() { cancel(); _ = cmd.Wait() }() + + // Initialize + initReq, _ := json.Marshal(MCPRequest{ + JSONRPC: "2.0", ID: 0, Method: "initialize", + Params: map[string]interface{}{ + "protocolVersion": "2024-11-05", + "capabilities": map[string]interface{}{}, + "clientInfo": map[string]interface{}{"name": "bench", "version": "1.0"}, + }, + }) + _, _ = stdin.Write(append(initReq, '\n')) + _, _ = reader.ReadString('\n') + + b.ResetTimer() + for i := 0; i < b.N; i++ { + req, _ := json.Marshal(MCPRequest{ + JSONRPC: "2.0", ID: i + 1, Method: "tools/call", + Params: map[string]interface{}{"name": "check_kubeshark_status", "arguments": map[string]interface{}{}}, + }) + if _, err := stdin.Write(append(req, '\n')); err != nil { + b.Fatalf("Write failed: %v", err) + } + if _, err := reader.ReadString('\n'); err != nil { + b.Fatalf("Read failed: %v", err) + } + } +} diff --git a/mcp/README.md b/mcp/README.md new file mode 100644 index 000000000..e8d7f8d02 --- /dev/null +++ b/mcp/README.md @@ -0,0 +1,185 @@ +# Kubeshark MCP Server + +[Kubeshark](https://kubeshark.com) MCP (Model Context Protocol) server enables AI assistants like Claude Desktop, Cursor, and other MCP-compatible clients to query real-time Kubernetes network traffic. + +## Features + +- **L7 API Traffic Analysis**: Query HTTP, gRPC, Redis, Kafka, DNS transactions +- **L4 Network Flows**: View TCP/UDP flows with traffic statistics +- **Cluster Management**: Start/stop Kubeshark deployments (with safety controls) +- **PCAP Snapshots**: Create and export network captures +- **Built-in Prompts**: Pre-configured prompts for common analysis tasks + +## Installation + +### 1. Install Kubeshark CLI + +```bash +# macOS +brew install kubeshark + +# Linux +sh <(curl -Ls https://kubeshark.com/install) + +# Windows (PowerShell) +choco install kubeshark +``` + +Or download from [GitHub Releases](https://github.com/kubeshark/kubeshark/releases). + +### 2. Configure Claude Desktop + +Add to your Claude Desktop configuration: + +**macOS**: `~/Library/Application Support/Claude/claude_desktop_config.json` +**Windows**: `%APPDATA%\Claude\claude_desktop_config.json` + +#### URL Mode (Recommended for existing deployments) + +```json +{ + "mcpServers": { + "kubeshark": { + "command": "kubeshark", + "args": ["mcp", "--url", "https://kubeshark.example.com"] + } + } +} +``` + +#### Proxy Mode (Requires kubectl access) + +```json +{ + "mcpServers": { + "kubeshark": { + "command": "kubeshark", + "args": ["mcp", "--kubeconfig", "/path/to/.kube/config"] + } + } +} +``` + +#### With Destructive Operations + +```json +{ + "mcpServers": { + "kubeshark": { + "command": "kubeshark", + "args": ["mcp", "--allow-destructive", "--kubeconfig", "/path/to/.kube/config"] + } + } +} +``` + +### 3. Generate Configuration + +Use the CLI to generate configuration: + +```bash +kubeshark mcp --mcp-config --url https://kubeshark.example.com +``` + +## Available Tools + +### Traffic Analysis (All Modes) + +| Tool | Description | +|------|-------------| +| `list_workloads` | List pods, services, namespaces with observed traffic | +| `list_api_calls` | Query L7 API transactions with KFL filtering | +| `get_api_call` | Get detailed info about a specific API call | +| `get_api_stats` | Get aggregated API statistics | +| `list_l4_flows` | List L4 (TCP/UDP) network flows | +| `get_l4_flow_summary` | Get L4 connectivity summary | +| `list_snapshots` | List all PCAP snapshots | +| `create_snapshot` | Create a new PCAP snapshot | +| `get_dissection_status` | Check L7 protocol parsing status | +| `enable_dissection` | Enable L7 protocol dissection | +| `disable_dissection` | Disable L7 protocol dissection | + +### Cluster Management (Proxy Mode Only) + +| Tool | Description | Requires | +|------|-------------|----------| +| `check_kubeshark_status` | Check if Kubeshark is running | - | +| `start_kubeshark` | Deploy Kubeshark to cluster | `--allow-destructive` | +| `stop_kubeshark` | Remove Kubeshark from cluster | `--allow-destructive` | + +## Available Prompts + +| Prompt | Description | +|--------|-------------| +| `analyze_traffic` | Analyze API traffic patterns and identify issues | +| `find_errors` | Find and summarize API errors and failures | +| `trace_request` | Trace a request path through microservices | +| `show_topology` | Show service communication topology | +| `latency_analysis` | Analyze latency patterns and identify slow endpoints | +| `security_audit` | Audit traffic for security concerns | +| `compare_traffic` | Compare traffic patterns between time periods | +| `debug_connection` | Debug connectivity issues between services | + +## Example Conversations + +``` +User: Show me all HTTP 500 errors in the last hour + +Claude: I'll query the API traffic for 500 errors. +[Calling list_api_calls with kfl="http and response.status == 500"] + +Found 12 HTTP 500 errors: +1. POST /api/checkout -> payment-service (500) + Time: 10:23:45 | Latency: 2340ms +... +``` + +``` +User: What services are communicating with the database? + +Claude: Let me check the L4 flows to the database. +[Calling list_l4_flows with dst_filter="postgres"] + +Found 5 services connecting to postgres:5432: +- orders-service: 456KB transferred +- users-service: 123KB transferred +... +``` + +## CLI Options + +| Option | Description | +|--------|-------------| +| `--url` | Direct URL to Kubeshark Hub | +| `--kubeconfig` | Path to kubeconfig file | +| `--allow-destructive` | Enable start/stop operations | +| `--list-tools` | List available tools and exit | +| `--mcp-config` | Print Claude Desktop config JSON | + +## KFL (Kubeshark Filter Language) + +Query traffic using KFL syntax: + +``` +# HTTP requests to a specific path +http and request.path == "/api/users" + +# Errors only +response.status >= 400 + +# Specific source pod +src.pod.name == "frontend-.*" + +# Multiple conditions +http and src.namespace == "default" and response.status == 500 +``` + +## Links + +- [Documentation](https://docs.kubeshark.com/en/mcp) +- [GitHub](https://github.com/kubeshark/kubeshark) +- [Website](https://kubeshark.com) + +## License + +Apache-2.0 diff --git a/mcp/server.json b/mcp/server.json new file mode 100644 index 000000000..e0d02196a --- /dev/null +++ b/mcp/server.json @@ -0,0 +1,205 @@ +{ + "$schema": "https://registry.modelcontextprotocol.io/schemas/server.schema.json", + "name": "com.kubeshark/mcp", + "displayName": "Kubeshark", + "description": "Real-time Kubernetes network traffic visibility and API analysis. Query L7 API transactions (HTTP, gRPC, Redis, Kafka, DNS), L4 network flows, and manage Kubeshark deployments directly from AI assistants.", + "icon": "https://kubeshark.com/favicon.ico", + "repository": { + "url": "https://github.com/kubeshark/kubeshark", + "source": "github" + }, + "homepage": "https://kubeshark.com", + "license": "Apache-2.0", + "version": "52.12.0", + "authors": [ + { + "name": "Kubeshark", + "url": "https://kubeshark.com" + } + ], + "categories": [ + "kubernetes", + "networking", + "observability", + "debugging", + "security" + ], + "tags": [ + "kubernetes", + "network", + "traffic", + "api", + "http", + "grpc", + "kafka", + "redis", + "dns", + "pcap", + "wireshark", + "tcpdump", + "observability", + "debugging", + "microservices" + ], + "packages": [ + { + "registryType": "github-releases", + "name": "kubeshark/kubeshark", + "version": "52.12.0", + "runtime": "binary", + "platforms": [ + "darwin-arm64", + "darwin-amd64", + "linux-arm64", + "linux-amd64", + "windows-amd64" + ], + "transport": { + "type": "stdio", + "command": "kubeshark", + "args": ["mcp"] + } + } + ], + "tools": [ + { + "name": "check_kubeshark_status", + "description": "Check if Kubeshark is currently running in the cluster. Read-only operation.", + "mode": "proxy" + }, + { + "name": "start_kubeshark", + "description": "Deploy Kubeshark to the Kubernetes cluster. Requires --allow-destructive flag.", + "mode": "proxy", + "destructive": true + }, + { + "name": "stop_kubeshark", + "description": "Remove Kubeshark from the Kubernetes cluster. Requires --allow-destructive flag.", + "mode": "proxy", + "destructive": true + }, + { + "name": "list_workloads", + "description": "List pods, services, namespaces, and nodes with observed L7 traffic.", + "mode": "all" + }, + { + "name": "list_api_calls", + "description": "Query L7 API transactions (HTTP, gRPC, Redis, Kafka, DNS) with KFL filtering.", + "mode": "all" + }, + { + "name": "get_api_call", + "description": "Get detailed information about a specific API call including headers and body.", + "mode": "all" + }, + { + "name": "get_api_stats", + "description": "Get aggregated API statistics and metrics.", + "mode": "all" + }, + { + "name": "list_l4_flows", + "description": "List L4 (TCP/UDP) network flows with traffic statistics.", + "mode": "all" + }, + { + "name": "get_l4_flow_summary", + "description": "Get L4 connectivity summary including top talkers and cross-namespace traffic.", + "mode": "all" + }, + { + "name": "list_snapshots", + "description": "List all PCAP snapshots.", + "mode": "all" + }, + { + "name": "create_snapshot", + "description": "Create a new PCAP snapshot of captured traffic.", + "mode": "all" + }, + { + "name": "get_dissection_status", + "description": "Check L7 protocol parsing status.", + "mode": "all" + }, + { + "name": "enable_dissection", + "description": "Enable L7 protocol dissection.", + "mode": "all" + }, + { + "name": "disable_dissection", + "description": "Disable L7 protocol dissection.", + "mode": "all" + } + ], + "prompts": [ + { + "name": "analyze_traffic", + "description": "Analyze API traffic patterns and identify issues" + }, + { + "name": "find_errors", + "description": "Find and summarize API errors and failures" + }, + { + "name": "trace_request", + "description": "Trace a request path through microservices" + }, + { + "name": "show_topology", + "description": "Show service communication topology" + }, + { + "name": "latency_analysis", + "description": "Analyze latency patterns and identify slow endpoints" + }, + { + "name": "security_audit", + "description": "Audit traffic for security concerns" + }, + { + "name": "compare_traffic", + "description": "Compare traffic patterns between time periods" + }, + { + "name": "debug_connection", + "description": "Debug connectivity issues between services" + } + ], + "configuration": { + "properties": { + "url": { + "type": "string", + "description": "Direct URL to Kubeshark Hub (e.g., https://kubeshark.example.com). When set, connects directly without kubectl/proxy.", + "examples": ["https://kubeshark.example.com", "http://localhost:8899"] + }, + "kubeconfig": { + "type": "string", + "description": "Path to kubeconfig file for proxy mode.", + "examples": ["~/.kube/config", "/path/to/.kube/config"] + }, + "allow-destructive": { + "type": "boolean", + "description": "Enable destructive operations (start_kubeshark, stop_kubeshark). Default: false for safety.", + "default": false + } + } + }, + "modes": { + "url": { + "description": "Connect directly to an existing Kubeshark deployment via URL. Cluster management tools are disabled.", + "args": ["mcp", "--url", "${url}"] + }, + "proxy": { + "description": "Connect via kubectl port-forward. Requires kubeconfig access to the cluster.", + "args": ["mcp", "--kubeconfig", "${kubeconfig}"] + }, + "proxy-destructive": { + "description": "Proxy mode with destructive operations enabled.", + "args": ["mcp", "--kubeconfig", "${kubeconfig}", "--allow-destructive"] + } + } +}