mirror of
https://github.com/k8sgpt-ai/k8sgpt.git
synced 2025-05-11 09:34:43 +00:00
feat: adds interplex as a caching provider (#1328)
* feat: adds interplex as a caching provider Signed-off-by: AlexsJones <alexsimonjones@gmail.com> * chore: refactored cache input to lower Signed-off-by: AlexsJones <alexsimonjones@gmail.com> * chore: refactored cache input to lower Signed-off-by: AlexsJones <alexsimonjones@gmail.com> --------- Signed-off-by: AlexsJones <alexsimonjones@gmail.com>
This commit is contained in:
parent
a841568a9c
commit
d6d80ee860
10
cmd/cache/add.go
vendored
10
cmd/cache/add.go
vendored
@ -17,6 +17,7 @@ package cache
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/k8sgpt-ai/k8sgpt/pkg/cache"
|
||||
@ -40,9 +41,10 @@ var addCmd = &cobra.Command{
|
||||
Short: "Add a remote cache",
|
||||
Long: `This command allows you to add a remote cache to store the results of an analysis.
|
||||
The supported cache types are:
|
||||
- Azure Blob storage
|
||||
- Google Cloud storage
|
||||
- S3`,
|
||||
- Azure Blob storage (e.g., k8sgpt cache add azure)
|
||||
- Google Cloud storage (e.g., k8sgpt cache add gcs)
|
||||
- S3 (e.g., k8sgpt cache add s3)
|
||||
- Interplex (e.g., k8sgpt cache add interplex)`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if len(args) == 0 {
|
||||
color.Red("Error: Please provide a value for cache types. Run k8sgpt cache add --help")
|
||||
@ -50,7 +52,7 @@ var addCmd = &cobra.Command{
|
||||
}
|
||||
fmt.Println(color.YellowString("Adding remote based cache"))
|
||||
cacheType := args[0]
|
||||
remoteCache, err := cache.NewCacheProvider(cacheType, bucketName, region, endpoint, storageAccount, containerName, projectId, insecure)
|
||||
remoteCache, err := cache.NewCacheProvider(strings.ToLower(cacheType), bucketName, region, endpoint, storageAccount, containerName, projectId, insecure)
|
||||
if err != nil {
|
||||
color.Red("Error: %v", err)
|
||||
os.Exit(1)
|
||||
|
45
cmd/cache/get.go
vendored
Normal file
45
cmd/cache/get.go
vendored
Normal file
@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright 2023 The K8sGPT Authors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/fatih/color"
|
||||
"github.com/k8sgpt-ai/k8sgpt/pkg/cache"
|
||||
"github.com/spf13/cobra"
|
||||
"os"
|
||||
)
|
||||
|
||||
// listCmd represents the list command
|
||||
var getCmd = &cobra.Command{
|
||||
Use: "get",
|
||||
Short: "Get the current cache",
|
||||
Long: `Returns the current remote cache being used`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
|
||||
// load remote cache if it is configured
|
||||
c, err := cache.GetCacheConfiguration()
|
||||
if err != nil {
|
||||
color.Red("Error: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("Current remote cache is: %s", c.GetName())
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
CacheCmd.AddCommand(getCmd)
|
||||
|
||||
}
|
2
go.mod
2
go.mod
@ -26,6 +26,8 @@ require (
|
||||
require github.com/adrg/xdg v0.5.3
|
||||
|
||||
require (
|
||||
buf.build/gen/go/interplex-ai/schemas/grpc/go v1.5.1-20241021105030-466c70d726a9.1
|
||||
buf.build/gen/go/interplex-ai/schemas/protocolbuffers/go v1.35.1-20241021105030-466c70d726a9.1
|
||||
buf.build/gen/go/k8sgpt-ai/k8sgpt/grpc-ecosystem/gateway/v2 v2.24.0-20241118152629-1379a5a1889d.1
|
||||
buf.build/gen/go/k8sgpt-ai/k8sgpt/grpc/go v1.5.1-20241118152629-1379a5a1889d.1
|
||||
buf.build/gen/go/k8sgpt-ai/k8sgpt/protocolbuffers/go v1.35.2-20241118152629-1379a5a1889d.1
|
||||
|
4
go.sum
4
go.sum
@ -6,6 +6,10 @@ atomicgo.dev/keyboard v0.2.9 h1:tOsIid3nlPLZ3lwgG8KZMp/SFmr7P0ssEN5JUsm78K8=
|
||||
atomicgo.dev/keyboard v0.2.9/go.mod h1:BC4w9g00XkxH/f1HXhW2sXmJFOCWbKn9xrOunSFtExQ=
|
||||
atomicgo.dev/schedule v0.1.0 h1:nTthAbhZS5YZmgYbb2+DH8uQIZcTlIrd4eYr3UQxEjs=
|
||||
atomicgo.dev/schedule v0.1.0/go.mod h1:xeUa3oAkiuHYh8bKiQBRojqAMq3PXXbJujjb0hw8pEU=
|
||||
buf.build/gen/go/interplex-ai/schemas/grpc/go v1.5.1-20241021105030-466c70d726a9.1 h1:Rby//teA60pwkpkXiwFa1YnXFOwIr46Fn+HNSwUwPcI=
|
||||
buf.build/gen/go/interplex-ai/schemas/grpc/go v1.5.1-20241021105030-466c70d726a9.1/go.mod h1:PW0g4+gFAnj01VJZSF/eiAQfv3UcdWtFYgIQNtjtmHQ=
|
||||
buf.build/gen/go/interplex-ai/schemas/protocolbuffers/go v1.35.1-20241021105030-466c70d726a9.1 h1:ycQpP4Tu2RI6InhgkQQGNd/H3t4VDX8OfsYwxR9B41A=
|
||||
buf.build/gen/go/interplex-ai/schemas/protocolbuffers/go v1.35.1-20241021105030-466c70d726a9.1/go.mod h1:8JNKSWnHOjbQWTqZ7zzvwcwosjgc5blxF61kGqHqea4=
|
||||
buf.build/gen/go/k8sgpt-ai/k8sgpt/grpc-ecosystem/gateway/v2 v2.24.0-20241118152629-1379a5a1889d.1 h1:9r9t2pVf+X8oesYpfTATH9FGIWbVy70eExkEQWj/qrA=
|
||||
buf.build/gen/go/k8sgpt-ai/k8sgpt/grpc-ecosystem/gateway/v2 v2.24.0-20241118152629-1379a5a1889d.1/go.mod h1:M+KYheBX0z4c6yvFj2WUmr/Qs1KtxwsecD6Hv9SUo/s=
|
||||
buf.build/gen/go/k8sgpt-ai/k8sgpt/grpc/go v1.5.1-20241118152629-1379a5a1889d.1 h1:thB3o5jG9fU30xDbC5+PC3DLyB8/TjGtObhvC/A+AK0=
|
||||
|
18
pkg/cache/cache.go
vendored
18
pkg/cache/cache.go
vendored
@ -14,6 +14,7 @@ var (
|
||||
&FileBasedCache{},
|
||||
&GCSCache{},
|
||||
&S3Cache{},
|
||||
&InterplexCache{},
|
||||
}
|
||||
)
|
||||
|
||||
@ -54,15 +55,21 @@ func NewCacheProvider(cacheType, bucketname, region, endpoint, storageAccount, c
|
||||
case cacheType == "azure":
|
||||
cProvider.Azure.ContainerName = containerName
|
||||
cProvider.Azure.StorageAccount = storageAccount
|
||||
cProvider.CurrentCacheType = "azure"
|
||||
case cacheType == "gcs":
|
||||
cProvider.GCS.BucketName = bucketname
|
||||
cProvider.GCS.ProjectId = projectId
|
||||
cProvider.GCS.Region = region
|
||||
cProvider.CurrentCacheType = "gcs"
|
||||
case cacheType == "s3":
|
||||
cProvider.S3.BucketName = bucketname
|
||||
cProvider.S3.Region = region
|
||||
cProvider.S3.Endpoint = endpoint
|
||||
cProvider.S3.InsecureSkipVerify = insecure
|
||||
cProvider.CurrentCacheType = "s3"
|
||||
case cacheType == "interplex":
|
||||
cProvider.Interplex.ConnectionString = endpoint
|
||||
cProvider.CurrentCacheType = "interplex"
|
||||
default:
|
||||
return CacheProvider{}, status.Error(codes.Internal, fmt.Sprintf("%s is not a valid option", cacheType))
|
||||
}
|
||||
@ -83,20 +90,19 @@ func GetCacheConfiguration() (ICache, error) {
|
||||
}
|
||||
|
||||
var cache ICache
|
||||
|
||||
switch {
|
||||
case cacheInfo.GCS != GCSCacheConfiguration{}:
|
||||
case cacheInfo.CurrentCacheType == "gcs":
|
||||
cache = &GCSCache{}
|
||||
case cacheInfo.Azure != AzureCacheConfiguration{}:
|
||||
case cacheInfo.CurrentCacheType == "azure":
|
||||
cache = &AzureCache{}
|
||||
case cacheInfo.S3 != S3CacheConfiguration{}:
|
||||
case cacheInfo.CurrentCacheType == "s3":
|
||||
cache = &S3Cache{}
|
||||
case cacheInfo.CurrentCacheType == "interplex":
|
||||
cache = &InterplexCache{}
|
||||
default:
|
||||
cache = &FileBasedCache{}
|
||||
}
|
||||
|
||||
err_config := cache.Configure(cacheInfo)
|
||||
|
||||
return cache, err_config
|
||||
}
|
||||
|
||||
|
103
pkg/cache/interplex_based.go
vendored
Normal file
103
pkg/cache/interplex_based.go
vendored
Normal file
@ -0,0 +1,103 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
rpc "buf.build/gen/go/interplex-ai/schemas/grpc/go/protobuf/schema/v1/schemav1grpc"
|
||||
schemav1 "buf.build/gen/go/interplex-ai/schemas/protocolbuffers/go/protobuf/schema/v1"
|
||||
"context"
|
||||
"errors"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var _ ICache = (*InterplexCache)(nil)
|
||||
|
||||
type InterplexCache struct {
|
||||
configuration InterplexCacheConfiguration
|
||||
client InterplexClient
|
||||
cacheServiceClient rpc.CacheServiceClient
|
||||
noCache bool
|
||||
}
|
||||
|
||||
type InterplexCacheConfiguration struct {
|
||||
ConnectionString string `mapstructure:"connectionString" yaml:"connectionString,omitempty"`
|
||||
}
|
||||
|
||||
type InterplexClient struct {
|
||||
}
|
||||
|
||||
func (c *InterplexCache) Configure(cacheInfo CacheProvider) error {
|
||||
|
||||
if cacheInfo.Interplex.ConnectionString == "" {
|
||||
return errors.New("connection string is required")
|
||||
}
|
||||
c.configuration.ConnectionString = cacheInfo.Interplex.ConnectionString
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *InterplexCache) Store(key string, data string) error {
|
||||
|
||||
conn, err := grpc.NewClient(c.configuration.ConnectionString, grpc.WithInsecure(), grpc.WithBlock())
|
||||
defer conn.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serviceClient := rpc.NewCacheServiceClient(conn)
|
||||
c.cacheServiceClient = serviceClient
|
||||
req := schemav1.SetRequest{
|
||||
Key: key,
|
||||
Value: data,
|
||||
}
|
||||
_, err = c.cacheServiceClient.Set(context.Background(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *InterplexCache) Load(key string) (string, error) {
|
||||
conn, err := grpc.NewClient(c.configuration.ConnectionString, grpc.WithInsecure(), grpc.WithBlock())
|
||||
defer conn.Close()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
serviceClient := rpc.NewCacheServiceClient(conn)
|
||||
c.cacheServiceClient = serviceClient
|
||||
req := schemav1.GetRequest{
|
||||
Key: key,
|
||||
}
|
||||
resp, err := c.cacheServiceClient.Get(context.Background(), &req)
|
||||
// check if response is cache error not found
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return resp.Value, nil
|
||||
}
|
||||
|
||||
func (InterplexCache) List() ([]CacheObjectDetails, error) {
|
||||
//TODO implement me
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (InterplexCache) Remove(key string) error {
|
||||
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (c *InterplexCache) Exists(key string) bool {
|
||||
if _, err := c.Load(key); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *InterplexCache) IsCacheDisabled() bool {
|
||||
return c.noCache
|
||||
}
|
||||
|
||||
func (InterplexCache) GetName() string {
|
||||
//TODO implement me
|
||||
return "interplex"
|
||||
}
|
||||
|
||||
func (c *InterplexCache) DisableCache() {
|
||||
c.noCache = true
|
||||
}
|
77
pkg/cache/interplex_based_test.go
vendored
Normal file
77
pkg/cache/interplex_based_test.go
vendored
Normal file
@ -0,0 +1,77 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
rpc "buf.build/gen/go/interplex-ai/schemas/grpc/go/protobuf/schema/v1/schemav1grpc"
|
||||
schemav1 "buf.build/gen/go/interplex-ai/schemas/protocolbuffers/go/protobuf/schema/v1"
|
||||
"context"
|
||||
"errors"
|
||||
"google.golang.org/grpc"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestInterplexCache(t *testing.T) {
|
||||
cache := &InterplexCache{
|
||||
configuration: InterplexCacheConfiguration{
|
||||
ConnectionString: "localhost:50051",
|
||||
},
|
||||
}
|
||||
|
||||
// Mock GRPC server setup
|
||||
go func() {
|
||||
lis, err := net.Listen("tcp", ":50051")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
rpc.RegisterCacheServiceServer(s, &mockCacheService{})
|
||||
if err := s.Serve(lis); err != nil {
|
||||
t.Fatalf("failed to serve: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Run("TestStore", func(t *testing.T) {
|
||||
err := cache.Store("key1", "value1")
|
||||
if err != nil {
|
||||
t.Errorf("Error storing value: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("TestLoad", func(t *testing.T) {
|
||||
value, err := cache.Load("key1")
|
||||
if err != nil {
|
||||
t.Errorf("Error loading value: %v", err)
|
||||
}
|
||||
if value != "value1" {
|
||||
t.Errorf("Expected value1, got %v", value)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("TestExists", func(t *testing.T) {
|
||||
exists := cache.Exists("key1")
|
||||
if !exists {
|
||||
t.Errorf("Expected key1 to exist")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type mockCacheService struct {
|
||||
rpc.UnimplementedCacheServiceServer
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
func (m *mockCacheService) Set(ctx context.Context, req *schemav1.SetRequest) (*schemav1.SetResponse, error) {
|
||||
if m.data == nil {
|
||||
m.data = make(map[string]string)
|
||||
}
|
||||
m.data[req.Key] = req.Value
|
||||
return &schemav1.SetResponse{}, nil
|
||||
}
|
||||
|
||||
func (m *mockCacheService) Get(ctx context.Context, req *schemav1.GetRequest) (*schemav1.GetResponse, error) {
|
||||
value, exists := m.data[req.Key]
|
||||
if !exists {
|
||||
return nil, errors.New("key not found")
|
||||
}
|
||||
return &schemav1.GetResponse{Value: value}, nil
|
||||
}
|
8
pkg/cache/types.go
vendored
8
pkg/cache/types.go
vendored
@ -3,9 +3,11 @@ package cache
|
||||
import "time"
|
||||
|
||||
type CacheProvider struct {
|
||||
GCS GCSCacheConfiguration `mapstructucre:"gcs" yaml:"gcs,omitempty"`
|
||||
Azure AzureCacheConfiguration `mapstructucre:"azure" yaml:"azure,omitempty"`
|
||||
S3 S3CacheConfiguration `mapstructucre:"s3" yaml:"s3,omitempty"`
|
||||
CurrentCacheType string `mapstructure:"currentCacheType" yaml:"currentCacheType"`
|
||||
GCS GCSCacheConfiguration `mapstructure:"gcs" yaml:"gcs,omitempty"`
|
||||
Azure AzureCacheConfiguration `mapstructure:"azure" yaml:"azure,omitempty"`
|
||||
S3 S3CacheConfiguration `mapstructure:"s3" yaml:"s3,omitempty"`
|
||||
Interplex InterplexCacheConfiguration `mapstructure:"interplex" yaml:"interplex,omitempty"`
|
||||
}
|
||||
|
||||
type CacheObjectDetails struct {
|
||||
|
@ -71,6 +71,7 @@ func (h *Handler) AddConfig(ctx context.Context, i *schemav1.AddConfigRequest) (
|
||||
remoteCache, err = cache.NewCacheProvider("s3", i.Cache.GetS3Cache().BucketName, i.Cache.GetS3Cache().Region, i.Cache.GetS3Cache().Endpoint, notUsedStorageAcc, notUsedContainerName, notUsedProjectId, i.Cache.GetS3Cache().Insecure)
|
||||
case *schemav1.Cache_GcsCache:
|
||||
remoteCache, err = cache.NewCacheProvider("gcs", i.Cache.GetGcsCache().BucketName, i.Cache.GetGcsCache().Region, notUsedEndpoint, notUsedStorageAcc, notUsedContainerName, i.Cache.GetGcsCache().GetProjectId(), notUsedInsecure)
|
||||
|
||||
default:
|
||||
return resp, status.Error(codes.InvalidArgument, "Invalid cache configuration")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user