diff --git a/pkg/kubeletclient/kubeletclient.go b/pkg/kubeletclient/kubeletclient.go index 5a51c0073..4c89bd852 100644 --- a/pkg/kubeletclient/kubeletclient.go +++ b/pkg/kubeletclient/kubeletclient.go @@ -20,7 +20,7 @@ import ( ) const ( - defaultkubeletSocket = "kubelet" // which is defined in k8s.io/kubernetes/pkg/kubelet/apis/podresources + defaultKubeletSocket = "kubelet" // which is defined in k8s.io/kubernetes/pkg/kubelet/apis/podresources kubeletConnectionTimeout = 10 * time.Second defaultKubeletSocketFile = "kubelet.sock" defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb @@ -38,10 +38,21 @@ func LocalEndpoint(path, file string) (string, error) { return filepath.Join(u.String(), file+".sock"), nil } +func removeUnixProtocol(endpoint string) (string, error) { + u, err := url.Parse(endpoint) + if err != nil { + return "", err + } + if u.Scheme != unixProtocol { + return "", fmt.Errorf("only support unix socket endpoint") + } + return u.Path, nil +} + // GetResourceClient returns an instance of ResourceClient interface initialized with Pod resource information func GetResourceClient(kubeletSocket string) (types.ResourceClient, error) { if kubeletSocket == "" { - kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, defaultkubeletSocket) + kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, defaultKubeletSocket) } // If Kubelet resource API endpoint exist use that by default // Or else fallback with checkpoint file @@ -59,10 +70,14 @@ func dial(ctx context.Context, addr string) (net.Conn, error) { } func getKubeletResourceClient(kubeletSocket string, timeout time.Duration) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) { + addr, err := removeUnixProtocol(kubeletSocket) + if err != nil { + return nil, nil, err + } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - conn, err := grpc.DialContext(ctx, kubeletSocket, grpc.WithTransportCredentials(insecure.NewCredentials()), + conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(dial), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultPodResourcesMaxSize))) if err != nil { @@ -74,7 +89,7 @@ func getKubeletResourceClient(kubeletSocket string, timeout time.Duration) (podr func getKubeletClient(kubeletSocket string) (types.ResourceClient, error) { newClient := &kubeletClient{} if kubeletSocket == "" { - kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, kubeletSocket) + kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, defaultKubeletSocket) } client, conn, err := getKubeletResourceClient(kubeletSocket, 10*time.Second) diff --git a/pkg/kubeletclient/kubeletclient_test.go b/pkg/kubeletclient/kubeletclient_test.go index 63fd53841..6d0f2ec3f 100644 --- a/pkg/kubeletclient/kubeletclient_test.go +++ b/pkg/kubeletclient/kubeletclient_test.go @@ -121,7 +121,7 @@ func setUp() error { socketDir = testingPodResourcesPath socketName = filepath.Join(socketDir, "kubelet.sock") - testKubeletSocket = socketName + testKubeletSocket, _ = LocalEndpoint(socketDir, "kubelet") fakeServer = &fakeResourceServer{server: grpc.NewServer()} podresourcesapi.RegisterPodResourcesListerServer(fakeServer.server, fakeServer) @@ -160,11 +160,17 @@ var _ = Describe("Kubelet resource endpoint data read operations", func() { }) It("should fail with missing file", func() { - _, err := GetResourceClient("sampleSocketString") + _, err := GetResourceClient("unix:/sampleSocketString") Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("error reading file")) + }) + + It("should fail with invalid protocol", func() { + _, err := GetResourceClient("tcp:" + socketName) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("only support unix socket endpoint")) }) }) - Context("GetPodResourceMap() with valid pod name and namespace", func() { It("should return no error", func() { podUID := k8sTypes.UID("970a395d-bb3b-11e8-89df-408d5c537d23")