mirror of
https://github.com/k8snetworkplumbingwg/multus-cni.git
synced 2025-08-25 11:32:16 +00:00
fix resource api grpc failed to connect to unix socket (#946)
After commitc6fa047212
resource api got broken in 2 places. first place handled byf530d3eb84
The second break was in replacing podresources.GetV1Client with getKubeletResourceClient. The GetV1Client remove the protocol (e.g unix:) by calling GetAddressAndDialer, but the getKubeletResourceClient is using the full endpoint (with the portocol) causing failed to connect to unix socket. This patch remove the unix: prefix before the grpc connect. Fixes: #944 Signed-off-by: Moshe Levi <moshele@nvidia.com> Signed-off-by: Moshe Levi <moshele@nvidia.com>
This commit is contained in:
parent
8550fa62a5
commit
6f8fa8c286
@ -20,7 +20,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultkubeletSocket = "kubelet" // which is defined in k8s.io/kubernetes/pkg/kubelet/apis/podresources
|
||||
defaultKubeletSocket = "kubelet" // which is defined in k8s.io/kubernetes/pkg/kubelet/apis/podresources
|
||||
kubeletConnectionTimeout = 10 * time.Second
|
||||
defaultKubeletSocketFile = "kubelet.sock"
|
||||
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
|
||||
@ -38,10 +38,21 @@ func LocalEndpoint(path, file string) (string, error) {
|
||||
return filepath.Join(u.String(), file+".sock"), nil
|
||||
}
|
||||
|
||||
func removeUnixProtocol(endpoint string) (string, error) {
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if u.Scheme != unixProtocol {
|
||||
return "", fmt.Errorf("only support unix socket endpoint")
|
||||
}
|
||||
return u.Path, nil
|
||||
}
|
||||
|
||||
// GetResourceClient returns an instance of ResourceClient interface initialized with Pod resource information
|
||||
func GetResourceClient(kubeletSocket string) (types.ResourceClient, error) {
|
||||
if kubeletSocket == "" {
|
||||
kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, defaultkubeletSocket)
|
||||
kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, defaultKubeletSocket)
|
||||
}
|
||||
// If Kubelet resource API endpoint exist use that by default
|
||||
// Or else fallback with checkpoint file
|
||||
@ -59,10 +70,14 @@ func dial(ctx context.Context, addr string) (net.Conn, error) {
|
||||
}
|
||||
|
||||
func getKubeletResourceClient(kubeletSocket string, timeout time.Duration) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) {
|
||||
addr, err := removeUnixProtocol(kubeletSocket)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, kubeletSocket, grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithContextDialer(dial),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultPodResourcesMaxSize)))
|
||||
if err != nil {
|
||||
@ -74,7 +89,7 @@ func getKubeletResourceClient(kubeletSocket string, timeout time.Duration) (podr
|
||||
func getKubeletClient(kubeletSocket string) (types.ResourceClient, error) {
|
||||
newClient := &kubeletClient{}
|
||||
if kubeletSocket == "" {
|
||||
kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, kubeletSocket)
|
||||
kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, defaultKubeletSocket)
|
||||
}
|
||||
|
||||
client, conn, err := getKubeletResourceClient(kubeletSocket, 10*time.Second)
|
||||
|
@ -121,7 +121,7 @@ func setUp() error {
|
||||
|
||||
socketDir = testingPodResourcesPath
|
||||
socketName = filepath.Join(socketDir, "kubelet.sock")
|
||||
testKubeletSocket = socketName
|
||||
testKubeletSocket, _ = LocalEndpoint(socketDir, "kubelet")
|
||||
|
||||
fakeServer = &fakeResourceServer{server: grpc.NewServer()}
|
||||
podresourcesapi.RegisterPodResourcesListerServer(fakeServer.server, fakeServer)
|
||||
@ -160,11 +160,17 @@ var _ = Describe("Kubelet resource endpoint data read operations", func() {
|
||||
})
|
||||
|
||||
It("should fail with missing file", func() {
|
||||
_, err := GetResourceClient("sampleSocketString")
|
||||
_, err := GetResourceClient("unix:/sampleSocketString")
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
Expect(err.Error()).To(ContainSubstring("error reading file"))
|
||||
})
|
||||
|
||||
It("should fail with invalid protocol", func() {
|
||||
_, err := GetResourceClient("tcp:" + socketName)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("only support unix socket endpoint"))
|
||||
})
|
||||
})
|
||||
Context("GetPodResourceMap() with valid pod name and namespace", func() {
|
||||
It("should return no error", func() {
|
||||
podUID := k8sTypes.UID("970a395d-bb3b-11e8-89df-408d5c537d23")
|
||||
|
Loading…
Reference in New Issue
Block a user