mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Clean up how client is passed to Kubelet in preparation for reading pods
Also fixes how Kubelet server looks up pods by name when there are multiple sources.
This commit is contained in:
parent
880ecef6fe
commit
ba53d723d3
1
.gitignore
vendored
1
.gitignore
vendored
@ -24,6 +24,7 @@ Session.vim
|
||||
|
||||
# Go test binaries
|
||||
*.test
|
||||
/hack/.test-cmd-auth
|
||||
|
||||
# Mercurial files
|
||||
**/.hg
|
||||
|
@ -191,11 +191,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
||||
minionController.Run(10 * time.Second)
|
||||
|
||||
// Kubelet (localhost)
|
||||
standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
|
||||
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
|
||||
// Kubelet (machine)
|
||||
// Create a second kubelet so that the guestbook example's two redis slaves both
|
||||
// have a place they can schedule.
|
||||
standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)
|
||||
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)
|
||||
|
||||
return apiServer.URL
|
||||
}
|
||||
|
@ -100,10 +100,13 @@ func main() {
|
||||
glog.Info(err)
|
||||
}
|
||||
|
||||
client, err := standalone.GetAPIServerClient(*authPath, apiServerList)
|
||||
if err != nil && len(apiServerList) > 0 {
|
||||
glog.Warningf("No API client: %v", err)
|
||||
}
|
||||
|
||||
kcfg := standalone.KubeletConfig{
|
||||
Address: address,
|
||||
AuthPath: *authPath,
|
||||
ApiServerList: apiServerList,
|
||||
AllowPrivileged: *allowPrivileged,
|
||||
HostnameOverride: *hostnameOverride,
|
||||
RootDirectory: *rootDirectory,
|
||||
@ -125,6 +128,7 @@ func main() {
|
||||
EnableServer: *enableServer,
|
||||
EnableDebuggingHandlers: *enableDebuggingHandlers,
|
||||
DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint),
|
||||
KubeClient: client,
|
||||
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
|
||||
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
|
||||
|
||||
dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
|
||||
standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
|
||||
standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
|
||||
}
|
||||
|
||||
func newApiClient(addr string, port int) *client.Client {
|
||||
|
@ -113,6 +113,8 @@ sudo "${GO_OUT}/kubelet" \
|
||||
--etcd_servers="http://127.0.0.1:4001" \
|
||||
--hostname_override="127.0.0.1" \
|
||||
--address="127.0.0.1" \
|
||||
--api_servers="${API_HOST}:${API_PORT}" \
|
||||
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
|
||||
--port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 &
|
||||
KUBELET_PID=$!
|
||||
|
||||
|
@ -58,6 +58,8 @@ kube::log::status "Starting kubelet"
|
||||
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
|
||||
--hostname_override="127.0.0.1" \
|
||||
--address="127.0.0.1" \
|
||||
--api_servers="${API_HOST}:${API_PORT}" \
|
||||
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
|
||||
--port="$KUBELET_PORT" 1>&2 &
|
||||
KUBELET_PID=$!
|
||||
|
||||
|
@ -1132,6 +1132,8 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
|
||||
// GetKubeletContainerLogs returns logs from the container
|
||||
// The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank
|
||||
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
|
||||
// or all of them.
|
||||
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
|
||||
_, err := kl.GetPodInfo(podFullName, "")
|
||||
if err == dockertools.ErrNoContainersInPod {
|
||||
@ -1153,6 +1155,18 @@ func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
|
||||
return kl.pods, nil
|
||||
}
|
||||
|
||||
// GetPodFullName provides the first pod that matches namespace and name, or false
|
||||
// if no such pod can be found.
|
||||
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
|
||||
for i := range kl.pods {
|
||||
pod := &kl.pods[i]
|
||||
if pod.Namespace == namespace && pod.Name == name {
|
||||
return pod, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// GetPodInfo returns information from Docker about the containers in a pod
|
||||
func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
|
||||
var manifest api.PodSpec
|
||||
|
@ -33,7 +33,6 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/golang/glog"
|
||||
"github.com/google/cadvisor/info"
|
||||
@ -66,6 +65,7 @@ type HostInterface interface {
|
||||
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
|
||||
GetMachineInfo() (*info.MachineInfo, error)
|
||||
GetBoundPods() ([]api.BoundPod, error)
|
||||
GetPodByName(namespace, name string) (*api.BoundPod, bool)
|
||||
GetPodInfo(name, uuid string) (api.PodInfo, error)
|
||||
RunInContainer(name, uuid, container string, cmd []string) ([]byte, error)
|
||||
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
|
||||
@ -146,13 +146,11 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
||||
follow, _ := strconv.ParseBool(uriValues.Get("follow"))
|
||||
tail := uriValues.Get("tail")
|
||||
|
||||
podFullName := GetPodFullName(&api.BoundPod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: podID,
|
||||
Namespace: podNamespace,
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
|
||||
},
|
||||
})
|
||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
fw := FlushWriter{writer: w}
|
||||
if flusher, ok := fw.writer.(http.Flusher); ok {
|
||||
@ -162,7 +160,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
err = s.host.GetKubeletContainerLogs(podFullName, containerName, tail, follow, &fw, &fw)
|
||||
err = s.host.GetKubeletContainerLogs(GetPodFullName(pod), containerName, tail, follow, &fw, &fw)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
@ -217,19 +215,12 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request, version
|
||||
http.Error(w, "Missing 'podNamespace=' query entry.", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// TODO: backwards compatibility with existing API, needs API change
|
||||
podFullName := GetPodFullName(&api.BoundPod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: podID,
|
||||
Namespace: podNamespace,
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
|
||||
},
|
||||
})
|
||||
info, err := s.host.GetPodInfo(podFullName, podUUID)
|
||||
if err == dockertools.ErrNoContainersInPod {
|
||||
http.Error(w, "api.BoundPod does not exist", http.StatusNotFound)
|
||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
info, err := s.host.GetPodInfo(GetPodFullName(pod), podUUID)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
@ -293,15 +284,13 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
|
||||
http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
podFullName := GetPodFullName(&api.BoundPod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: podID,
|
||||
Namespace: podNamespace,
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
|
||||
},
|
||||
})
|
||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
command := strings.Split(u.Query().Get("cmd"), " ")
|
||||
data, err := s.host.RunInContainer(podFullName, uuid, container, command)
|
||||
data, err := s.host.RunInContainer(GetPodFullName(pod), uuid, container, command)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
@ -344,24 +333,20 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
|
||||
// TODO(monnand) Implement this
|
||||
errors.New("pod level status currently unimplemented")
|
||||
case 3:
|
||||
// Backward compatibility without uuid information
|
||||
podFullName := GetPodFullName(&api.BoundPod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: components[1],
|
||||
Namespace: api.NamespaceDefault,
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
|
||||
},
|
||||
})
|
||||
stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query)
|
||||
// Backward compatibility without uuid information, does not support namespace
|
||||
pod, ok := s.host.GetPodByName(api.NamespaceDefault, components[1])
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), "", components[2], &query)
|
||||
case 5:
|
||||
podFullName := GetPodFullName(&api.BoundPod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: components[2],
|
||||
Namespace: components[1],
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
|
||||
},
|
||||
})
|
||||
stats, err = s.host.GetContainerInfo(podFullName, components[3], components[4], &query)
|
||||
pod, ok := s.host.GetPodByName(components[1], components[2])
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), components[3], components[4], &query)
|
||||
default:
|
||||
http.Error(w, "unknown resource.", http.StatusNotFound)
|
||||
return
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
)
|
||||
|
||||
type fakeKubelet struct {
|
||||
podByNameFunc func(namespace, name string) (*api.BoundPod, bool)
|
||||
infoFunc func(name string) (api.PodInfo, error)
|
||||
containerInfoFunc func(podFullName, uid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
|
||||
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
|
||||
@ -43,6 +44,10 @@ type fakeKubelet struct {
|
||||
containerLogsFunc func(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
|
||||
return fk.podByNameFunc(namespace, name)
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) {
|
||||
return fk.infoFunc(name)
|
||||
}
|
||||
@ -88,7 +93,19 @@ func newServerTest() *serverTestFramework {
|
||||
updateChan: make(chan interface{}),
|
||||
}
|
||||
fw.updateReader = startReading(fw.updateChan)
|
||||
fw.fakeKubelet = &fakeKubelet{}
|
||||
fw.fakeKubelet = &fakeKubelet{
|
||||
podByNameFunc: func(namespace, name string) (*api.BoundPod, bool) {
|
||||
return &api.BoundPod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
Annotations: map[string]string{
|
||||
ConfigSourceAnnotationKey: "etcd",
|
||||
},
|
||||
},
|
||||
}, true
|
||||
},
|
||||
}
|
||||
server := NewServer(fw.fakeKubelet, true)
|
||||
fw.serverUnderTest = &server
|
||||
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
@ -27,7 +26,6 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
@ -97,47 +95,12 @@ func SetupLogging() {
|
||||
record.StartLogging(glog.Infof)
|
||||
}
|
||||
|
||||
// TODO: move this into pkg/client
|
||||
func getApiserverClient(authPath string, apiServerList util.StringList) (*client.Client, error) {
|
||||
authInfo, err := clientauth.LoadFromFile(authPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(apiServerList) < 1 {
|
||||
return nil, fmt.Errorf("no apiservers specified.")
|
||||
}
|
||||
// TODO: adapt Kube client to support LB over several servers
|
||||
if len(apiServerList) > 1 {
|
||||
glog.Infof("Mulitple api servers specified. Picking first one")
|
||||
}
|
||||
clientConfig.Host = apiServerList[0]
|
||||
if c, err := client.New(&clientConfig); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
func SetupEventSending(authPath string, apiServerList util.StringList) {
|
||||
// Make an API client if possible.
|
||||
if len(apiServerList) < 1 {
|
||||
glog.Info("No api servers specified.")
|
||||
} else {
|
||||
if apiClient, err := getApiserverClient(authPath, apiServerList); err != nil {
|
||||
glog.Errorf("Unable to make apiserver client: %v", err)
|
||||
} else {
|
||||
// Send events to APIserver if there is a client.
|
||||
hostname := util.GetHostname("")
|
||||
glog.Infof("Sending events to APIserver.")
|
||||
record.StartRecording(apiClient.Events(""),
|
||||
api.EventSource{
|
||||
Component: "kubelet",
|
||||
Host: hostname,
|
||||
})
|
||||
}
|
||||
}
|
||||
func SetupEventSending(client *client.Client) {
|
||||
glog.Infof("Sending events to api server.")
|
||||
hostname := util.GetHostname("")
|
||||
record.StartRecording(client.Events(""),
|
||||
api.EventSource{
|
||||
Component: "kubelet",
|
||||
Host: hostname,
|
||||
})
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
|
||||
minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||
@ -55,6 +56,31 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
|
||||
// TODO: replace this with clientcmd
|
||||
func GetAPIServerClient(authPath string, apiServerList util.StringList) (*client.Client, error) {
|
||||
authInfo, err := clientauth.LoadFromFile(authPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(apiServerList) < 1 {
|
||||
return nil, fmt.Errorf("no api servers specified.")
|
||||
}
|
||||
// TODO: adapt Kube client to support LB over several servers
|
||||
if len(apiServerList) > 1 {
|
||||
glog.Infof("Mulitple api servers specified. Picking first one")
|
||||
}
|
||||
clientConfig.Host = apiServerList[0]
|
||||
c, err := client.New(&clientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// RunApiServer starts an API server in a go routine.
|
||||
func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int) {
|
||||
handler := delegateHandler{}
|
||||
@ -129,8 +155,9 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
||||
|
||||
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient.
|
||||
// Under the hood it calls RunKubelet (below)
|
||||
func SimpleRunKubelet(etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) {
|
||||
func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) {
|
||||
kcfg := KubeletConfig{
|
||||
KubeClient: client,
|
||||
EtcdClient: etcdClient,
|
||||
DockerClient: dockerClient,
|
||||
HostnameOverride: hostname,
|
||||
@ -152,7 +179,11 @@ func SimpleRunKubelet(etcdClient tools.EtcdClient, dockerClient dockertools.Dock
|
||||
// 3 Standalone 'kubernetes' binary
|
||||
// Eventually, #2 will be replaced with instances of #3
|
||||
func RunKubelet(kcfg *KubeletConfig) {
|
||||
kubelet.SetupEventSending(kcfg.AuthPath, kcfg.ApiServerList)
|
||||
if kcfg.KubeClient != nil {
|
||||
kubelet.SetupEventSending(kcfg.KubeClient)
|
||||
} else {
|
||||
glog.Infof("No api server defined - no events will be sent.")
|
||||
}
|
||||
kubelet.SetupLogging()
|
||||
kubelet.SetupCapabilities(kcfg.AllowPrivileged)
|
||||
|
||||
@ -210,11 +241,10 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
|
||||
|
||||
type KubeletConfig struct {
|
||||
EtcdClient tools.EtcdClient
|
||||
KubeClient *client.Client
|
||||
DockerClient dockertools.DockerInterface
|
||||
CAdvisorPort uint
|
||||
Address util.IP
|
||||
AuthPath string
|
||||
ApiServerList util.StringList
|
||||
AllowPrivileged bool
|
||||
HostnameOverride string
|
||||
RootDirectory string
|
||||
|
Loading…
Reference in New Issue
Block a user