mirror of
https://github.com/k8snetworkplumbingwg/multus-cni.git
synced 2025-09-16 06:26:21 +00:00
@@ -1,30 +1,48 @@
|
||||
package kubeletclient
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/checkpoint"
|
||||
"gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/logging"
|
||||
"gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/types"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
//"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
//"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
)
|
||||
|
||||
const (
|
||||
kubeletSocket = "kubelet" // which is defined in k8s.io/kubernetes/pkg/kubelet/apis/podresources
|
||||
kubeletConnectionTimeout = 10 * time.Second
|
||||
defaultKubeletSocketFile = "kubelet.sock"
|
||||
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
|
||||
defaultPodResourcesPath = "/var/lib/kubelet/pod-resources"
|
||||
unixProtocol = "unix"
|
||||
)
|
||||
|
||||
// LocalEndpoint returns the full path to a unix socket at the given endpoint
|
||||
// which is in k8s.io/kubernetes/pkg/kubelet/util
|
||||
func LocalEndpoint(path, file string) (string, error) {
|
||||
u := url.URL{
|
||||
Scheme: unixProtocol,
|
||||
Path: path,
|
||||
}
|
||||
return filepath.Join(u.String(), file+".sock"), nil
|
||||
}
|
||||
|
||||
// GetResourceClient returns an instance of ResourceClient interface initialized with Pod resource information
|
||||
func GetResourceClient(kubeletSocket string) (types.ResourceClient, error) {
|
||||
if kubeletSocket == "" {
|
||||
kubeletSocket, _ = util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
|
||||
kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, kubeletSocket)
|
||||
}
|
||||
// If Kubelet resource API endpoint exist use that by default
|
||||
// Or else fallback with checkpoint file
|
||||
@@ -37,13 +55,30 @@ func GetResourceClient(kubeletSocket string) (types.ResourceClient, error) {
|
||||
return checkpoint.GetCheckpoint()
|
||||
}
|
||||
|
||||
func dial(ctx context.Context, addr string) (net.Conn, error) {
|
||||
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
|
||||
}
|
||||
|
||||
func getKubeletResourceClient(kubeletSocket string, timeout time.Duration) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, kubeletSocket, grpc.WithInsecure(),
|
||||
grpc.WithContextDialer(dial),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultPodResourcesMaxSize)))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error dialing socket %s: %v", kubeletSocket, err)
|
||||
}
|
||||
return podresourcesapi.NewPodResourcesListerClient(conn), conn, nil
|
||||
}
|
||||
|
||||
func getKubeletClient(kubeletSocket string) (types.ResourceClient, error) {
|
||||
newClient := &kubeletClient{}
|
||||
if kubeletSocket == "" {
|
||||
kubeletSocket, _ = util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
|
||||
kubeletSocket, _ = LocalEndpoint(defaultPodResourcesPath, kubeletSocket)
|
||||
}
|
||||
|
||||
client, conn, err := podresources.GetV1Client(kubeletSocket, 10*time.Second, defaultPodResourcesMaxSize)
|
||||
client, conn, err := getKubeletResourceClient(kubeletSocket, 10*time.Second)
|
||||
if err != nil {
|
||||
return nil, logging.Errorf("getKubeletClient: error getting grpc client: %v\n", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user