Merge branch 'master' into mcp-unit-tests

This commit is contained in:
Alon Girmonsky
2026-02-06 11:15:01 -08:00
committed by GitHub
6 changed files with 1256 additions and 0 deletions

View File

@@ -74,6 +74,69 @@ clean: ## Clean all build artifacts.
test: ## Run cli tests.
@go test ./... -coverpkg=./... -race -coverprofile=coverage.out -covermode=atomic
test-integration: ## Run integration tests (requires Kubernetes cluster).
@echo "Running integration tests..."
@LOG_FILE=$$(mktemp /tmp/integration-test.XXXXXX.log); \
go test -tags=integration -timeout $${INTEGRATION_TIMEOUT:-5m} -v ./integration/... 2>&1 | tee $$LOG_FILE; \
status=$$?; \
echo ""; \
echo "========================================"; \
echo " INTEGRATION TEST SUMMARY"; \
echo "========================================"; \
grep -E "^(--- PASS|--- FAIL|--- SKIP)" $$LOG_FILE || true; \
echo "----------------------------------------"; \
pass=$$(grep -c "^--- PASS" $$LOG_FILE 2>/dev/null || true); \
fail=$$(grep -c "^--- FAIL" $$LOG_FILE 2>/dev/null || true); \
skip=$$(grep -c "^--- SKIP" $$LOG_FILE 2>/dev/null || true); \
echo "PASSED: $${pass:-0}"; \
echo "FAILED: $${fail:-0}"; \
echo "SKIPPED: $${skip:-0}"; \
echo "========================================"; \
rm -f $$LOG_FILE; \
exit $$status
test-integration-mcp: ## Run only MCP integration tests.
@echo "Running MCP integration tests..."
@LOG_FILE=$$(mktemp /tmp/integration-test.XXXXXX.log); \
go test -tags=integration -timeout $${INTEGRATION_TIMEOUT:-5m} -v ./integration/ -run "MCP" 2>&1 | tee $$LOG_FILE; \
status=$$?; \
echo ""; \
echo "========================================"; \
echo " INTEGRATION TEST SUMMARY"; \
echo "========================================"; \
grep -E "^(--- PASS|--- FAIL|--- SKIP)" $$LOG_FILE || true; \
echo "----------------------------------------"; \
pass=$$(grep -c "^--- PASS" $$LOG_FILE 2>/dev/null || true); \
fail=$$(grep -c "^--- FAIL" $$LOG_FILE 2>/dev/null || true); \
skip=$$(grep -c "^--- SKIP" $$LOG_FILE 2>/dev/null || true); \
echo "PASSED: $${pass:-0}"; \
echo "FAILED: $${fail:-0}"; \
echo "SKIPPED: $${skip:-0}"; \
echo "========================================"; \
rm -f $$LOG_FILE; \
exit $$status
test-integration-short: ## Run quick integration tests (skips long-running tests).
@echo "Running quick integration tests..."
@LOG_FILE=$$(mktemp /tmp/integration-test.XXXXXX.log); \
go test -tags=integration -timeout $${INTEGRATION_TIMEOUT:-2m} -short -v ./integration/... 2>&1 | tee $$LOG_FILE; \
status=$$?; \
echo ""; \
echo "========================================"; \
echo " INTEGRATION TEST SUMMARY"; \
echo "========================================"; \
grep -E "^(--- PASS|--- FAIL|--- SKIP)" $$LOG_FILE || true; \
echo "----------------------------------------"; \
pass=$$(grep -c "^--- PASS" $$LOG_FILE 2>/dev/null || true); \
fail=$$(grep -c "^--- FAIL" $$LOG_FILE 2>/dev/null || true); \
skip=$$(grep -c "^--- SKIP" $$LOG_FILE 2>/dev/null || true); \
echo "PASSED: $${pass:-0}"; \
echo "FAILED: $${fail:-0}"; \
echo "SKIPPED: $${skip:-0}"; \
echo "========================================"; \
rm -f $$LOG_FILE; \
exit $$status
lint: ## Lint the source code.
golangci-lint run

57
integration/README.md Normal file
View File

@@ -0,0 +1,57 @@
# Integration Tests
This directory contains integration tests that run against a real Kubernetes cluster.
## Prerequisites
1. **Kubernetes cluster** - A running cluster accessible via `kubectl`
2. **kubectl** - Configured with appropriate context
3. **Go 1.21+** - For running tests
## Running Tests
```bash
# Run all integration tests
make test-integration
# Run specific command tests
make test-integration-mcp
# Run with verbose output
make test-integration-verbose
# Run with custom timeout (default: 5m)
INTEGRATION_TIMEOUT=10m make test-integration
```
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `KUBESHARK_BINARY` | Auto-built | Path to pre-built kubeshark binary |
| `INTEGRATION_TIMEOUT` | `5m` | Test timeout duration |
| `KUBECONFIG` | `~/.kube/config` | Kubernetes config file |
| `INTEGRATION_SKIP_CLEANUP` | `false` | Skip cleanup after tests (for debugging) |
## Test Structure
```
integration/
├── README.md # This file
├── common_test.go # Shared test helpers
├── mcp_test.go # MCP command integration tests
├── tap_test.go # Tap command tests (future)
└── ... # Additional command tests
```
## Writing New Tests
1. Create `<command>_test.go` with build tag `//go:build integration`
2. Use helpers from `common_test.go`: `requireKubernetesCluster(t)`, `getKubesharkBinary(t)`, `cleanupKubeshark(t, binary)`
## CI/CD Integration
```bash
# JSON output for CI parsing
go test -tags=integration -json ./integration/...
```

217
integration/common_test.go Normal file
View File

@@ -0,0 +1,217 @@
//go:build integration
// Package integration contains integration tests that run against a real Kubernetes cluster.
// Run with: go test -tags=integration ./integration/...
package integration
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"
)
const (
binaryName = "kubeshark"
defaultTimeout = 2 * time.Minute
startupTimeout = 3 * time.Minute
)
var (
// binaryPath caches the built binary path
binaryPath string
buildOnce sync.Once
buildErr error
)
// requireKubernetesCluster skips the test if no Kubernetes cluster is available.
func requireKubernetesCluster(t *testing.T) {
t.Helper()
if !hasKubernetesCluster() {
t.Skip("Skipping: no Kubernetes cluster available")
}
}
// hasKubernetesCluster returns true if a Kubernetes cluster is accessible.
func hasKubernetesCluster() bool {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return exec.CommandContext(ctx, "kubectl", "cluster-info").Run() == nil
}
// getKubesharkBinary returns the path to the kubeshark binary, building it if necessary.
func getKubesharkBinary(t *testing.T) string {
t.Helper()
// Check if binary path is provided via environment
if envBinary := os.Getenv("KUBESHARK_BINARY"); envBinary != "" {
if _, err := os.Stat(envBinary); err == nil {
return envBinary
}
t.Fatalf("KUBESHARK_BINARY set but file not found: %s", envBinary)
}
// Build once per test run
buildOnce.Do(func() {
binaryPath, buildErr = buildBinary(t)
})
if buildErr != nil {
t.Fatalf("Failed to build binary: %v", buildErr)
}
return binaryPath
}
// buildBinary compiles the binary and returns its path.
func buildBinary(t *testing.T) (string, error) {
t.Helper()
// Find project root (directory containing go.mod)
projectRoot, err := findProjectRoot()
if err != nil {
return "", fmt.Errorf("finding project root: %w", err)
}
outputPath := filepath.Join(projectRoot, "bin", binaryName+"_integration_test")
t.Logf("Building %s binary at %s", binaryName, outputPath)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
cmd := exec.CommandContext(ctx, "go", "build",
"-o", outputPath,
filepath.Join(projectRoot, binaryName+".go"),
)
cmd.Dir = projectRoot
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("build failed: %w\nOutput: %s", err, output)
}
return outputPath, nil
}
// findProjectRoot locates the project root by finding go.mod
func findProjectRoot() (string, error) {
dir, err := os.Getwd()
if err != nil {
return "", err
}
for {
if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
return dir, nil
}
parent := filepath.Dir(dir)
if parent == dir {
return "", fmt.Errorf("could not find go.mod in any parent directory")
}
dir = parent
}
}
// runKubeshark executes the kubeshark binary with the given arguments.
// Returns combined stdout/stderr and any error.
func runKubeshark(t *testing.T, binary string, args ...string) (string, error) {
t.Helper()
return runKubesharkWithTimeout(t, binary, defaultTimeout, args...)
}
// runKubesharkWithTimeout executes the kubeshark binary with a custom timeout.
func runKubesharkWithTimeout(t *testing.T, binary string, timeout time.Duration, args ...string) (string, error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
t.Logf("Running: %s %s", binary, strings.Join(args, " "))
cmd := exec.CommandContext(ctx, binary, args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
output := stdout.String()
if stderr.Len() > 0 {
output += "\n[stderr]\n" + stderr.String()
}
if ctx.Err() == context.DeadlineExceeded {
return output, fmt.Errorf("command timed out after %v", timeout)
}
return output, err
}
// cleanupKubeshark ensures Kubeshark is not running in the cluster.
func cleanupKubeshark(t *testing.T, binary string) {
t.Helper()
if os.Getenv("INTEGRATION_SKIP_CLEANUP") == "true" {
t.Log("Skipping cleanup (INTEGRATION_SKIP_CLEANUP=true)")
return
}
t.Log("Cleaning up any existing Kubeshark installation...")
// Run clean command, ignore errors (might not be installed)
_, _ = runKubeshark(t, binary, "clean")
// Wait a moment for resources to be deleted
time.Sleep(2 * time.Second)
}
// waitForKubesharkReady waits for Kubeshark to be ready after starting.
func waitForKubesharkReady(t *testing.T, binary string, timeout time.Duration) error {
t.Helper()
t.Log("Waiting for Kubeshark to be ready...")
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
// Check if pods are running
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
cmd := exec.CommandContext(ctx, "kubectl", "get", "pods", "-l", "app.kubernetes.io/name=kubeshark", "-o", "jsonpath={.items[*].status.phase}")
output, err := cmd.Output()
cancel()
if err == nil && strings.Contains(string(output), "Running") {
t.Log("Kubeshark is ready")
return nil
}
time.Sleep(5 * time.Second)
}
return fmt.Errorf("timeout waiting for Kubeshark to be ready")
}
// isKubesharkRunning checks if Kubeshark is currently running in the cluster.
func isKubesharkRunning(t *testing.T) bool {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "kubectl", "get", "pods", "-l", "app.kubernetes.io/name=kubeshark", "-o", "name")
output, err := cmd.Output()
if err != nil {
return false
}
return strings.TrimSpace(string(output)) != ""
}

529
integration/mcp_test.go Normal file
View File

@@ -0,0 +1,529 @@
//go:build integration
package integration
import (
"bufio"
"bytes"
"context"
"encoding/json"
"io"
"os/exec"
"strings"
"testing"
"time"
)
// MCPRequest represents a JSON-RPC request
type MCPRequest struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}
// MCPResponse represents a JSON-RPC response
type MCPResponse struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *MCPError `json:"error,omitempty"`
}
// MCPError represents a JSON-RPC error
type MCPError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// mcpSession represents a running MCP server session
type mcpSession struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout *bufio.Reader
stderr *bytes.Buffer // Captured stderr for debugging
cancel context.CancelFunc
}
// startMCPSession starts an MCP server and returns a session for sending requests.
// By default, starts in read-only mode (no --allow-destructive).
func startMCPSession(t *testing.T, binary string, args ...string) *mcpSession {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
cmdArgs := append([]string{"mcp"}, args...)
cmd := exec.CommandContext(ctx, binary, cmdArgs...)
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
t.Fatalf("Failed to create stdin pipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
t.Fatalf("Failed to create stdout pipe: %v", err)
}
// Capture stderr for debugging
var stderrBuf bytes.Buffer
cmd.Stderr = &stderrBuf
if err := cmd.Start(); err != nil {
cancel()
t.Fatalf("Failed to start MCP server: %v", err)
}
return &mcpSession{
cmd: cmd,
stdin: stdin,
stdout: bufio.NewReader(stdout),
stderr: &stderrBuf,
cancel: cancel,
}
}
// startMCPSessionWithDestructive starts an MCP server with --allow-destructive flag.
func startMCPSessionWithDestructive(t *testing.T, binary string, args ...string) *mcpSession {
t.Helper()
allArgs := append([]string{"--allow-destructive"}, args...)
return startMCPSession(t, binary, allArgs...)
}
// sendRequest sends a JSON-RPC request and returns the response (30s timeout).
func (s *mcpSession) sendRequest(t *testing.T, req MCPRequest) MCPResponse {
t.Helper()
return s.sendRequestWithTimeout(t, req, 30*time.Second)
}
// sendRequestWithTimeout sends a JSON-RPC request with a custom timeout.
func (s *mcpSession) sendRequestWithTimeout(t *testing.T, req MCPRequest, timeout time.Duration) MCPResponse {
t.Helper()
reqBytes, err := json.Marshal(req)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
t.Logf("Sending: %s", string(reqBytes))
if _, err := s.stdin.Write(append(reqBytes, '\n')); err != nil {
t.Fatalf("Failed to write request: %v", err)
}
// Read response with timeout
responseChan := make(chan string, 1)
errChan := make(chan error, 1)
go func() {
line, err := s.stdout.ReadString('\n')
if err != nil {
errChan <- err
return
}
responseChan <- line
}()
select {
case line := <-responseChan:
t.Logf("Received: %s", strings.TrimSpace(line))
var resp MCPResponse
if err := json.Unmarshal([]byte(line), &resp); err != nil {
t.Fatalf("Failed to unmarshal response: %v\nResponse: %s", err, line)
}
return resp
case err := <-errChan:
t.Fatalf("Failed to read response: %v", err)
return MCPResponse{}
case <-time.After(timeout):
t.Fatalf("Timeout waiting for MCP response after %v", timeout)
return MCPResponse{}
}
}
// callTool invokes an MCP tool and returns the response (30s timeout).
func (s *mcpSession) callTool(t *testing.T, id int, toolName string, args map[string]interface{}) MCPResponse {
t.Helper()
return s.callToolWithTimeout(t, id, toolName, args, 30*time.Second)
}
// callToolWithTimeout invokes an MCP tool with a custom timeout.
func (s *mcpSession) callToolWithTimeout(t *testing.T, id int, toolName string, args map[string]interface{}, timeout time.Duration) MCPResponse {
t.Helper()
return s.sendRequestWithTimeout(t, MCPRequest{
JSONRPC: "2.0",
ID: id,
Method: "tools/call",
Params: map[string]interface{}{
"name": toolName,
"arguments": args,
},
}, timeout)
}
// close terminates the MCP session.
func (s *mcpSession) close() {
s.cancel()
_ = s.cmd.Wait()
}
// getStderr returns any captured stderr output (useful for debugging failures).
func (s *mcpSession) getStderr() string {
if s.stderr == nil {
return ""
}
return s.stderr.String()
}
// initialize sends the MCP initialize request and returns the response.
func (s *mcpSession) initialize(t *testing.T, id int) MCPResponse {
t.Helper()
return s.sendRequest(t, MCPRequest{
JSONRPC: "2.0",
ID: id,
Method: "initialize",
Params: map[string]interface{}{
"protocolVersion": "2024-11-05",
"capabilities": map[string]interface{}{},
"clientInfo": map[string]interface{}{"name": "test", "version": "1.0"},
},
})
}
// TestMCP_Initialize tests the MCP initialization handshake.
func TestMCP_Initialize(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
resp := session.initialize(t, 1)
if resp.Error != nil {
t.Fatalf("Initialize failed: %s", resp.Error.Message)
}
var result map[string]interface{}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if _, ok := result["capabilities"]; !ok {
t.Error("Response missing capabilities")
}
if _, ok := result["serverInfo"]; !ok {
t.Error("Response missing serverInfo")
}
}
// TestMCP_ToolsList_ReadOnly tests that tools/list returns only safe tools in read-only mode.
func TestMCP_ToolsList_ReadOnly(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"})
if resp.Error != nil {
t.Fatalf("tools/list failed: %s", resp.Error.Message)
}
var result struct {
Tools []struct{ Name string `json:"name"` } `json:"tools"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
toolNames := make(map[string]bool)
for _, tool := range result.Tools {
toolNames[tool.Name] = true
}
if !toolNames["check_kubeshark_status"] {
t.Error("Missing expected tool: check_kubeshark_status")
}
if toolNames["start_kubeshark"] || toolNames["stop_kubeshark"] {
t.Error("Destructive tools should not be available in read-only mode")
}
}
// TestMCP_ToolsList_WithDestructive tests that tools/list includes destructive tools when flag is set.
func TestMCP_ToolsList_WithDestructive(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSessionWithDestructive(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"})
if resp.Error != nil {
t.Fatalf("tools/list failed: %s", resp.Error.Message)
}
var result struct {
Tools []struct{ Name string `json:"name"` } `json:"tools"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
toolNames := make(map[string]bool)
for _, tool := range result.Tools {
toolNames[tool.Name] = true
}
for _, expected := range []string{"check_kubeshark_status", "start_kubeshark", "stop_kubeshark"} {
if !toolNames[expected] {
t.Errorf("Missing expected tool: %s", expected)
}
}
}
// TestMCP_CheckKubesharkStatus_NotRunning tests check_kubeshark_status when Kubeshark is not running.
func TestMCP_CheckKubesharkStatus_NotRunning(t *testing.T) {
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
session := startMCPSession(t, binary)
defer session.close()
session.initialize(t, 1)
resp := session.callTool(t, 2, "check_kubeshark_status", nil)
if resp.Error != nil {
t.Fatalf("check_kubeshark_status failed: %s", resp.Error.Message)
}
var result struct {
Content []struct{ Text string `json:"text"` } `json:"content"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if len(result.Content) == 0 || (!strings.Contains(result.Content[0].Text, "not running") && !strings.Contains(result.Content[0].Text, "NOT")) {
t.Errorf("Expected 'not running' status")
}
}
// TestMCP_StartKubeshark tests the start_kubeshark tool.
func TestMCP_StartKubeshark(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
t.Cleanup(func() { cleanupKubeshark(t, binary) })
session := startMCPSessionWithDestructive(t, binary)
defer session.close()
session.initialize(t, 1)
resp := session.callToolWithTimeout(t, 2, "start_kubeshark", nil, 3*time.Minute)
if resp.Error != nil {
t.Fatalf("start_kubeshark failed: %s", resp.Error.Message)
}
if !isKubesharkRunning(t) {
t.Error("Kubeshark should be running after start_kubeshark")
}
}
// TestMCP_StartKubeshark_WithoutFlag tests that start_kubeshark fails without --allow-destructive.
func TestMCP_StartKubeshark_WithoutFlag(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.callTool(t, 2, "start_kubeshark", nil)
var result struct {
Content []struct{ Text string `json:"text"` } `json:"content"`
IsError bool `json:"isError"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if !result.IsError {
t.Error("Expected isError=true without --allow-destructive")
}
}
// TestMCP_StopKubeshark tests the stop_kubeshark tool.
func TestMCP_StopKubeshark(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
session := startMCPSessionWithDestructive(t, binary)
defer session.close()
session.initialize(t, 0)
// Start Kubeshark if not running
if !isKubesharkRunning(t) {
resp := session.callToolWithTimeout(t, 1, "start_kubeshark", nil, 2*time.Minute)
if resp.Error != nil {
t.Skipf("Could not start Kubeshark: %v", resp.Error.Message)
}
}
resp := session.callToolWithTimeout(t, 2, "stop_kubeshark", nil, 2*time.Minute)
if resp.Error != nil {
t.Fatalf("stop_kubeshark failed: %s", resp.Error.Message)
}
time.Sleep(5 * time.Second)
if isKubesharkRunning(t) {
t.Error("Kubeshark should not be running after stop_kubeshark")
}
}
// TestMCP_StopKubeshark_WithoutFlag tests that stop_kubeshark fails without --allow-destructive.
func TestMCP_StopKubeshark_WithoutFlag(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.callTool(t, 2, "stop_kubeshark", nil)
var result struct {
IsError bool `json:"isError"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if !result.IsError {
t.Error("Expected isError=true without --allow-destructive")
}
}
// TestMCP_FullLifecycle tests the complete lifecycle: check -> start -> check -> stop -> check
func TestMCP_FullLifecycle(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
session := startMCPSessionWithDestructive(t, binary)
defer session.close()
session.initialize(t, 1)
// Check -> Start -> Check -> Stop -> Check
if resp := session.callTool(t, 2, "check_kubeshark_status", nil); resp.Error != nil {
t.Fatalf("Initial status check failed: %s", resp.Error.Message)
}
if resp := session.callToolWithTimeout(t, 3, "start_kubeshark", nil, 3*time.Minute); resp.Error != nil {
t.Fatalf("Start failed: %s", resp.Error.Message)
}
if err := waitForKubesharkReady(t, binary, startupTimeout); err != nil {
t.Fatalf("Kubeshark did not become ready: %v", err)
}
if resp := session.callTool(t, 4, "check_kubeshark_status", nil); resp.Error != nil {
t.Fatalf("Status check after start failed: %s", resp.Error.Message)
}
if resp := session.callToolWithTimeout(t, 5, "stop_kubeshark", nil, 2*time.Minute); resp.Error != nil {
t.Fatalf("Stop failed: %s", resp.Error.Message)
}
time.Sleep(5 * time.Second)
if resp := session.callTool(t, 6, "check_kubeshark_status", nil); resp.Error != nil {
t.Fatalf("Final status check failed: %s", resp.Error.Message)
}
}
// TestMCP_APIToolsRequireKubeshark tests that API tools return helpful errors when Kubeshark isn't running.
func TestMCP_APIToolsRequireKubeshark(t *testing.T) {
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
session := startMCPSession(t, binary)
defer session.close()
session.initialize(t, 1)
for i, tool := range []string{"list_workloads", "list_api_calls", "get_api_stats"} {
resp := session.callTool(t, i+2, tool, nil)
// Either error or helpful message is acceptable
if resp.Error != nil {
t.Logf("%s returned error (expected): %s", tool, resp.Error.Message)
}
}
}
// TestMCP_SetFlags tests that --set flags are passed correctly.
func TestMCP_SetFlags(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t), "--set", "tap.namespaces={default}")
defer session.close()
session.initialize(t, 1)
resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"})
if resp.Error != nil {
t.Fatalf("tools/list failed with --set flags: %s", resp.Error.Message)
}
}
// BenchmarkMCP_CheckStatus benchmarks the check_kubeshark_status tool.
func BenchmarkMCP_CheckStatus(b *testing.B) {
if testing.Short() {
b.Skip("Skipping benchmark in short mode")
}
if !hasKubernetesCluster() {
b.Skip("Skipping: no Kubernetes cluster available")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.CommandContext(ctx, getKubesharkBinary(b), "mcp")
stdin, _ := cmd.StdinPipe()
stdout, _ := cmd.StdoutPipe()
reader := bufio.NewReader(stdout)
if err := cmd.Start(); err != nil {
b.Fatalf("Failed to start MCP: %v", err)
}
defer func() { cancel(); _ = cmd.Wait() }()
// Initialize
initReq, _ := json.Marshal(MCPRequest{
JSONRPC: "2.0", ID: 0, Method: "initialize",
Params: map[string]interface{}{
"protocolVersion": "2024-11-05",
"capabilities": map[string]interface{}{},
"clientInfo": map[string]interface{}{"name": "bench", "version": "1.0"},
},
})
_, _ = stdin.Write(append(initReq, '\n'))
_, _ = reader.ReadString('\n')
b.ResetTimer()
for i := 0; i < b.N; i++ {
req, _ := json.Marshal(MCPRequest{
JSONRPC: "2.0", ID: i + 1, Method: "tools/call",
Params: map[string]interface{}{"name": "check_kubeshark_status", "arguments": map[string]interface{}{}},
})
if _, err := stdin.Write(append(req, '\n')); err != nil {
b.Fatalf("Write failed: %v", err)
}
if _, err := reader.ReadString('\n'); err != nil {
b.Fatalf("Read failed: %v", err)
}
}
}

185
mcp/README.md Normal file
View File

@@ -0,0 +1,185 @@
# Kubeshark MCP Server
[Kubeshark](https://kubeshark.com) MCP (Model Context Protocol) server enables AI assistants like Claude Desktop, Cursor, and other MCP-compatible clients to query real-time Kubernetes network traffic.
## Features
- **L7 API Traffic Analysis**: Query HTTP, gRPC, Redis, Kafka, DNS transactions
- **L4 Network Flows**: View TCP/UDP flows with traffic statistics
- **Cluster Management**: Start/stop Kubeshark deployments (with safety controls)
- **PCAP Snapshots**: Create and export network captures
- **Built-in Prompts**: Pre-configured prompts for common analysis tasks
## Installation
### 1. Install Kubeshark CLI
```bash
# macOS
brew install kubeshark
# Linux
sh <(curl -Ls https://kubeshark.com/install)
# Windows (PowerShell)
choco install kubeshark
```
Or download from [GitHub Releases](https://github.com/kubeshark/kubeshark/releases).
### 2. Configure Claude Desktop
Add to your Claude Desktop configuration:
**macOS**: `~/Library/Application Support/Claude/claude_desktop_config.json`
**Windows**: `%APPDATA%\Claude\claude_desktop_config.json`
#### URL Mode (Recommended for existing deployments)
```json
{
"mcpServers": {
"kubeshark": {
"command": "kubeshark",
"args": ["mcp", "--url", "https://kubeshark.example.com"]
}
}
}
```
#### Proxy Mode (Requires kubectl access)
```json
{
"mcpServers": {
"kubeshark": {
"command": "kubeshark",
"args": ["mcp", "--kubeconfig", "/path/to/.kube/config"]
}
}
}
```
#### With Destructive Operations
```json
{
"mcpServers": {
"kubeshark": {
"command": "kubeshark",
"args": ["mcp", "--allow-destructive", "--kubeconfig", "/path/to/.kube/config"]
}
}
}
```
### 3. Generate Configuration
Use the CLI to generate configuration:
```bash
kubeshark mcp --mcp-config --url https://kubeshark.example.com
```
## Available Tools
### Traffic Analysis (All Modes)
| Tool | Description |
|------|-------------|
| `list_workloads` | List pods, services, namespaces with observed traffic |
| `list_api_calls` | Query L7 API transactions with KFL filtering |
| `get_api_call` | Get detailed info about a specific API call |
| `get_api_stats` | Get aggregated API statistics |
| `list_l4_flows` | List L4 (TCP/UDP) network flows |
| `get_l4_flow_summary` | Get L4 connectivity summary |
| `list_snapshots` | List all PCAP snapshots |
| `create_snapshot` | Create a new PCAP snapshot |
| `get_dissection_status` | Check L7 protocol parsing status |
| `enable_dissection` | Enable L7 protocol dissection |
| `disable_dissection` | Disable L7 protocol dissection |
### Cluster Management (Proxy Mode Only)
| Tool | Description | Requires |
|------|-------------|----------|
| `check_kubeshark_status` | Check if Kubeshark is running | - |
| `start_kubeshark` | Deploy Kubeshark to cluster | `--allow-destructive` |
| `stop_kubeshark` | Remove Kubeshark from cluster | `--allow-destructive` |
## Available Prompts
| Prompt | Description |
|--------|-------------|
| `analyze_traffic` | Analyze API traffic patterns and identify issues |
| `find_errors` | Find and summarize API errors and failures |
| `trace_request` | Trace a request path through microservices |
| `show_topology` | Show service communication topology |
| `latency_analysis` | Analyze latency patterns and identify slow endpoints |
| `security_audit` | Audit traffic for security concerns |
| `compare_traffic` | Compare traffic patterns between time periods |
| `debug_connection` | Debug connectivity issues between services |
## Example Conversations
```
User: Show me all HTTP 500 errors in the last hour
Claude: I'll query the API traffic for 500 errors.
[Calling list_api_calls with kfl="http and response.status == 500"]
Found 12 HTTP 500 errors:
1. POST /api/checkout -> payment-service (500)
Time: 10:23:45 | Latency: 2340ms
...
```
```
User: What services are communicating with the database?
Claude: Let me check the L4 flows to the database.
[Calling list_l4_flows with dst_filter="postgres"]
Found 5 services connecting to postgres:5432:
- orders-service: 456KB transferred
- users-service: 123KB transferred
...
```
## CLI Options
| Option | Description |
|--------|-------------|
| `--url` | Direct URL to Kubeshark Hub |
| `--kubeconfig` | Path to kubeconfig file |
| `--allow-destructive` | Enable start/stop operations |
| `--list-tools` | List available tools and exit |
| `--mcp-config` | Print Claude Desktop config JSON |
## KFL (Kubeshark Filter Language)
Query traffic using KFL syntax:
```
# HTTP requests to a specific path
http and request.path == "/api/users"
# Errors only
response.status >= 400
# Specific source pod
src.pod.name == "frontend-.*"
# Multiple conditions
http and src.namespace == "default" and response.status == 500
```
## Links
- [Documentation](https://docs.kubeshark.com/en/mcp)
- [GitHub](https://github.com/kubeshark/kubeshark)
- [Website](https://kubeshark.com)
## License
Apache-2.0

205
mcp/server.json Normal file
View File

@@ -0,0 +1,205 @@
{
"$schema": "https://registry.modelcontextprotocol.io/schemas/server.schema.json",
"name": "com.kubeshark/mcp",
"displayName": "Kubeshark",
"description": "Real-time Kubernetes network traffic visibility and API analysis. Query L7 API transactions (HTTP, gRPC, Redis, Kafka, DNS), L4 network flows, and manage Kubeshark deployments directly from AI assistants.",
"icon": "https://kubeshark.com/favicon.ico",
"repository": {
"url": "https://github.com/kubeshark/kubeshark",
"source": "github"
},
"homepage": "https://kubeshark.com",
"license": "Apache-2.0",
"version": "52.12.0",
"authors": [
{
"name": "Kubeshark",
"url": "https://kubeshark.com"
}
],
"categories": [
"kubernetes",
"networking",
"observability",
"debugging",
"security"
],
"tags": [
"kubernetes",
"network",
"traffic",
"api",
"http",
"grpc",
"kafka",
"redis",
"dns",
"pcap",
"wireshark",
"tcpdump",
"observability",
"debugging",
"microservices"
],
"packages": [
{
"registryType": "github-releases",
"name": "kubeshark/kubeshark",
"version": "52.12.0",
"runtime": "binary",
"platforms": [
"darwin-arm64",
"darwin-amd64",
"linux-arm64",
"linux-amd64",
"windows-amd64"
],
"transport": {
"type": "stdio",
"command": "kubeshark",
"args": ["mcp"]
}
}
],
"tools": [
{
"name": "check_kubeshark_status",
"description": "Check if Kubeshark is currently running in the cluster. Read-only operation.",
"mode": "proxy"
},
{
"name": "start_kubeshark",
"description": "Deploy Kubeshark to the Kubernetes cluster. Requires --allow-destructive flag.",
"mode": "proxy",
"destructive": true
},
{
"name": "stop_kubeshark",
"description": "Remove Kubeshark from the Kubernetes cluster. Requires --allow-destructive flag.",
"mode": "proxy",
"destructive": true
},
{
"name": "list_workloads",
"description": "List pods, services, namespaces, and nodes with observed L7 traffic.",
"mode": "all"
},
{
"name": "list_api_calls",
"description": "Query L7 API transactions (HTTP, gRPC, Redis, Kafka, DNS) with KFL filtering.",
"mode": "all"
},
{
"name": "get_api_call",
"description": "Get detailed information about a specific API call including headers and body.",
"mode": "all"
},
{
"name": "get_api_stats",
"description": "Get aggregated API statistics and metrics.",
"mode": "all"
},
{
"name": "list_l4_flows",
"description": "List L4 (TCP/UDP) network flows with traffic statistics.",
"mode": "all"
},
{
"name": "get_l4_flow_summary",
"description": "Get L4 connectivity summary including top talkers and cross-namespace traffic.",
"mode": "all"
},
{
"name": "list_snapshots",
"description": "List all PCAP snapshots.",
"mode": "all"
},
{
"name": "create_snapshot",
"description": "Create a new PCAP snapshot of captured traffic.",
"mode": "all"
},
{
"name": "get_dissection_status",
"description": "Check L7 protocol parsing status.",
"mode": "all"
},
{
"name": "enable_dissection",
"description": "Enable L7 protocol dissection.",
"mode": "all"
},
{
"name": "disable_dissection",
"description": "Disable L7 protocol dissection.",
"mode": "all"
}
],
"prompts": [
{
"name": "analyze_traffic",
"description": "Analyze API traffic patterns and identify issues"
},
{
"name": "find_errors",
"description": "Find and summarize API errors and failures"
},
{
"name": "trace_request",
"description": "Trace a request path through microservices"
},
{
"name": "show_topology",
"description": "Show service communication topology"
},
{
"name": "latency_analysis",
"description": "Analyze latency patterns and identify slow endpoints"
},
{
"name": "security_audit",
"description": "Audit traffic for security concerns"
},
{
"name": "compare_traffic",
"description": "Compare traffic patterns between time periods"
},
{
"name": "debug_connection",
"description": "Debug connectivity issues between services"
}
],
"configuration": {
"properties": {
"url": {
"type": "string",
"description": "Direct URL to Kubeshark Hub (e.g., https://kubeshark.example.com). When set, connects directly without kubectl/proxy.",
"examples": ["https://kubeshark.example.com", "http://localhost:8899"]
},
"kubeconfig": {
"type": "string",
"description": "Path to kubeconfig file for proxy mode.",
"examples": ["~/.kube/config", "/path/to/.kube/config"]
},
"allow-destructive": {
"type": "boolean",
"description": "Enable destructive operations (start_kubeshark, stop_kubeshark). Default: false for safety.",
"default": false
}
}
},
"modes": {
"url": {
"description": "Connect directly to an existing Kubeshark deployment via URL. Cluster management tools are disabled.",
"args": ["mcp", "--url", "${url}"]
},
"proxy": {
"description": "Connect via kubectl port-forward. Requires kubeconfig access to the cluster.",
"args": ["mcp", "--kubeconfig", "${kubeconfig}"]
},
"proxy-destructive": {
"description": "Proxy mode with destructive operations enabled.",
"args": ["mcp", "--kubeconfig", "${kubeconfig}", "--allow-destructive"]
}
}
}