k8sgpt/pkg/cache/interplex_based.go
Alex Jones e41ffd80d0
feat: add MCP support (#1471)
* feat: first mcp impl

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: update

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: wip

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: switcheed to stdio transport

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: readme

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat: fix the linter 🤖

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat: fix the linter 🤖

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat(mcp): implement MCP server and handler

- Implement MCP server and handler
- Add MCP server to serve
- Add MCP handler to handle MCP requests
- Add MCP server to serve
- Add MCP handler to handle MCP requests

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat: consolidating code duplication

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* feat: added http sse support

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: fixed broken tests

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: updated and fixed linter

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: updated and fixed linter

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

* chore: updated the linter issues

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>

---------

Signed-off-by: Alex Jones <alexsimonjones@gmail.com>
2025-04-29 09:22:44 +01:00

132 lines
3.2 KiB
Go

package cache
import (
"context"
"errors"
"fmt"
"os"
rpc "buf.build/gen/go/interplex-ai/schemas/grpc/go/protobuf/schema/v1/schemav1grpc"
schemav1 "buf.build/gen/go/interplex-ai/schemas/protocolbuffers/go/protobuf/schema/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var _ ICache = (*InterplexCache)(nil)
type InterplexCache struct {
configuration InterplexCacheConfiguration
client InterplexClient
cacheServiceClient rpc.CacheServiceClient
noCache bool
}
type InterplexCacheConfiguration struct {
ConnectionString string `mapstructure:"connectionString" yaml:"connectionString,omitempty"`
}
type InterplexClient struct {
}
func (c *InterplexCache) Configure(cacheInfo CacheProvider) error {
if cacheInfo.Interplex.ConnectionString == "" {
return errors.New("connection string is required")
}
c.configuration.ConnectionString = cacheInfo.Interplex.ConnectionString
return nil
}
func (c *InterplexCache) Store(key string, data string) error {
if os.Getenv("INTERPLEX_LOCAL_MODE") != "" {
c.configuration.ConnectionString = "localhost:8084"
}
conn, err := grpc.NewClient(c.configuration.ConnectionString, grpc.WithInsecure(), grpc.WithBlock())
defer conn.Close()
if err != nil {
return err
}
serviceClient := rpc.NewCacheServiceClient(conn)
c.cacheServiceClient = serviceClient
req := schemav1.SetRequest{
Key: key,
Value: data,
}
_, err = c.cacheServiceClient.Set(context.Background(), &req)
if err != nil {
return err
}
return nil
}
func (c *InterplexCache) Load(key string) (string, error) {
if os.Getenv("INTERPLEX_LOCAL_MODE") != "" {
c.configuration.ConnectionString = "localhost:8084"
}
conn, err := grpc.NewClient(c.configuration.ConnectionString, grpc.WithInsecure(), grpc.WithBlock())
defer conn.Close()
if err != nil {
return "", err
}
serviceClient := rpc.NewCacheServiceClient(conn)
c.cacheServiceClient = serviceClient
req := schemav1.GetRequest{
Key: key,
}
resp, err := c.cacheServiceClient.Get(context.Background(), &req)
if err != nil {
return "", err
}
return resp.Value, nil
}
func (c *InterplexCache) List() ([]CacheObjectDetails, error) {
// Not implemented for Interplex cache
return []CacheObjectDetails{}, nil
}
func (c *InterplexCache) Remove(key string) error {
if os.Getenv("INTERPLEX_LOCAL_MODE") != "" {
c.configuration.ConnectionString = "localhost:8084"
}
conn, err := grpc.NewClient(c.configuration.ConnectionString, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer func() {
if err := conn.Close(); err != nil {
// Log the error but don't return it since this is a deferred function
fmt.Printf("Error closing connection: %v\n", err)
}
}()
serviceClient := rpc.NewCacheServiceClient(conn)
c.cacheServiceClient = serviceClient
req := schemav1.DeleteRequest{
Key: key,
}
_, err = c.cacheServiceClient.Delete(context.Background(), &req)
return err
}
func (c *InterplexCache) Exists(key string) bool {
_, err := c.Load(key)
return err == nil
}
func (c *InterplexCache) IsCacheDisabled() bool {
return c.noCache
}
func (c *InterplexCache) GetName() string {
return "interplex"
}
func (c *InterplexCache) DisableCache() {
c.noCache = true
}