mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Remove HTTP Server support for pushing pods onto the kubelet.
This commit is contained in:
parent
edfae8660e
commit
928f52056e
@ -21,7 +21,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -36,22 +35,20 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/ghodss/yaml"
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/google/cadvisor/info"
|
"github.com/google/cadvisor/info"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is a http.Handler which exposes kubelet functionality over HTTP.
|
// Server is a http.Handler which exposes kubelet functionality over HTTP.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
host HostInterface
|
host HostInterface
|
||||||
updates chan<- interface{}
|
mux *http.ServeMux
|
||||||
mux *http.ServeMux
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
|
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
|
||||||
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address net.IP, port uint, enableDebuggingHandlers bool) {
|
func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint, enableDebuggingHandlers bool) {
|
||||||
glog.V(1).Infof("Starting to listen on %s:%d", address, port)
|
glog.V(1).Infof("Starting to listen on %s:%d", address, port)
|
||||||
handler := NewServer(host, updates, enableDebuggingHandlers)
|
handler := NewServer(host, enableDebuggingHandlers)
|
||||||
s := &http.Server{
|
s := &http.Server{
|
||||||
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
||||||
Handler: &handler,
|
Handler: &handler,
|
||||||
@ -59,7 +56,6 @@ func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{},
|
|||||||
WriteTimeout: 5 * time.Minute,
|
WriteTimeout: 5 * time.Minute,
|
||||||
MaxHeaderBytes: 1 << 20,
|
MaxHeaderBytes: 1 << 20,
|
||||||
}
|
}
|
||||||
updates <- PodUpdate{[]api.BoundPod{}, SET, ServerSource}
|
|
||||||
glog.Fatal(s.ListenAndServe())
|
glog.Fatal(s.ListenAndServe())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,11 +73,10 @@ type HostInterface interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
|
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
|
||||||
func NewServer(host HostInterface, updates chan<- interface{}, enableDebuggingHandlers bool) Server {
|
func NewServer(host HostInterface, enableDebuggingHandlers bool) Server {
|
||||||
server := Server{
|
server := Server{
|
||||||
host: host,
|
host: host,
|
||||||
updates: updates,
|
mux: http.NewServeMux(),
|
||||||
mux: http.NewServeMux(),
|
|
||||||
}
|
}
|
||||||
server.InstallDefaultHandlers()
|
server.InstallDefaultHandlers()
|
||||||
if enableDebuggingHandlers {
|
if enableDebuggingHandlers {
|
||||||
@ -102,9 +97,6 @@ func (s *Server) InstallDefaultHandlers() {
|
|||||||
|
|
||||||
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
||||||
func (s *Server) InstallDebuggingHandlers() {
|
func (s *Server) InstallDebuggingHandlers() {
|
||||||
// ToDo: /container, /run, and /containers aren't debugging options, should probably be handled separately
|
|
||||||
s.mux.HandleFunc("/container", s.handleContainer)
|
|
||||||
s.mux.HandleFunc("/containers", s.handleContainers)
|
|
||||||
s.mux.HandleFunc("/run/", s.handleRun)
|
s.mux.HandleFunc("/run/", s.handleRun)
|
||||||
|
|
||||||
s.mux.HandleFunc("/logs/", s.handleLogs)
|
s.mux.HandleFunc("/logs/", s.handleLogs)
|
||||||
@ -116,60 +108,6 @@ func (s *Server) error(w http.ResponseWriter, err error) {
|
|||||||
http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError)
|
http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleContainer handles container requests against the Kubelet.
|
|
||||||
func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
|
|
||||||
defer req.Body.Close()
|
|
||||||
data, err := ioutil.ReadAll(req.Body)
|
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// This is to provide backward compatibility. It only supports a single manifest
|
|
||||||
var pod api.BoundPod
|
|
||||||
var containerManifest api.ContainerManifest
|
|
||||||
err = yaml.Unmarshal(data, &containerManifest)
|
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pod.Name = containerManifest.ID
|
|
||||||
pod.UID = containerManifest.UUID
|
|
||||||
pod.Spec.Containers = containerManifest.Containers
|
|
||||||
pod.Spec.Volumes = containerManifest.Volumes
|
|
||||||
pod.Spec.RestartPolicy = containerManifest.RestartPolicy
|
|
||||||
//TODO: sha1 of manifest?
|
|
||||||
if pod.Name == "" {
|
|
||||||
pod.Name = "1"
|
|
||||||
}
|
|
||||||
if pod.UID == "" {
|
|
||||||
pod.UID = "1"
|
|
||||||
}
|
|
||||||
s.updates <- PodUpdate{[]api.BoundPod{pod}, SET, ServerSource}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleContainers handles containers requests against the Kubelet.
|
|
||||||
func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
|
|
||||||
defer req.Body.Close()
|
|
||||||
data, err := ioutil.ReadAll(req.Body)
|
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var specs []api.PodSpec
|
|
||||||
err = yaml.Unmarshal(data, &specs)
|
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pods := make([]api.BoundPod, len(specs))
|
|
||||||
for i := range specs {
|
|
||||||
pods[i].Name = fmt.Sprintf("%d", i+1)
|
|
||||||
pods[i].Spec = specs[i]
|
|
||||||
}
|
|
||||||
s.updates <- PodUpdate{pods, SET, ServerSource}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleContainerLogs handles containerLogs request against the Kubelet
|
// handleContainerLogs handles containerLogs request against the Kubelet
|
||||||
func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
||||||
defer req.Body.Close()
|
defer req.Body.Close()
|
||||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package kubelet
|
package kubelet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -90,7 +89,7 @@ func newServerTest() *serverTestFramework {
|
|||||||
}
|
}
|
||||||
fw.updateReader = startReading(fw.updateChan)
|
fw.updateReader = startReading(fw.updateChan)
|
||||||
fw.fakeKubelet = &fakeKubelet{}
|
fw.fakeKubelet = &fakeKubelet{}
|
||||||
server := NewServer(fw.fakeKubelet, fw.updateChan, true)
|
server := NewServer(fw.fakeKubelet, true)
|
||||||
fw.serverUnderTest = &server
|
fw.serverUnderTest = &server
|
||||||
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
|
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
|
||||||
return fw
|
return fw
|
||||||
@ -111,160 +110,6 @@ func readResp(resp *http.Response) (string, error) {
|
|||||||
return string(body), err
|
return string(body), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestContainer(t *testing.T) {
|
|
||||||
fw := newServerTest()
|
|
||||||
expected := []api.ContainerManifest{
|
|
||||||
{
|
|
||||||
ID: "test_manifest",
|
|
||||||
UUID: "value",
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "container",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: []api.Volume{
|
|
||||||
{
|
|
||||||
Name: "test",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RestartPolicy: api.RestartPolicy{
|
|
||||||
Never: &api.RestartPolicyNever{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
body := bytes.NewBuffer([]byte(encodeJSON(expected[0]))) // Only send a single ContainerManifest
|
|
||||||
resp, err := http.Post(fw.testHTTPServer.URL+"/container", "application/json", body)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Post returned: %v", err)
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
close(fw.updateChan)
|
|
||||||
received := fw.updateReader.GetList()
|
|
||||||
if len(received) != 1 {
|
|
||||||
t.Errorf("Expected 1 manifest, but got %v", len(received))
|
|
||||||
}
|
|
||||||
expectedPods := []api.BoundPod{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "test_manifest",
|
|
||||||
UID: "value",
|
|
||||||
},
|
|
||||||
Spec: api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "container",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: []api.Volume{
|
|
||||||
{
|
|
||||||
Name: "test",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RestartPolicy: api.RestartPolicy{
|
|
||||||
Never: &api.RestartPolicyNever{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(expectedPods, received[0]) {
|
|
||||||
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContainers(t *testing.T) {
|
|
||||||
fw := newServerTest()
|
|
||||||
expected := []api.ContainerManifest{
|
|
||||||
{
|
|
||||||
ID: "test_manifest_1",
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "container",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: []api.Volume{
|
|
||||||
{
|
|
||||||
Name: "test",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RestartPolicy: api.RestartPolicy{
|
|
||||||
Never: &api.RestartPolicyNever{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ID: "test_manifest_2",
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "container2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: []api.Volume{
|
|
||||||
{
|
|
||||||
Name: "test2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RestartPolicy: api.RestartPolicy{
|
|
||||||
Never: &api.RestartPolicyNever{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
body := bytes.NewBuffer([]byte(encodeJSON(expected)))
|
|
||||||
resp, err := http.Post(fw.testHTTPServer.URL+"/containers", "application/json", body)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Post returned: %v", err)
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
close(fw.updateChan)
|
|
||||||
received := fw.updateReader.GetList()
|
|
||||||
if len(received) != 1 {
|
|
||||||
t.Errorf("Expected 1 update, but got %v", len(received))
|
|
||||||
}
|
|
||||||
expectedPods := []api.BoundPod{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "1",
|
|
||||||
},
|
|
||||||
Spec: api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "container",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: []api.Volume{
|
|
||||||
{
|
|
||||||
Name: "test",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RestartPolicy: api.RestartPolicy{
|
|
||||||
Never: &api.RestartPolicyNever{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: "2",
|
|
||||||
},
|
|
||||||
Spec: api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{
|
|
||||||
Name: "container2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: []api.Volume{
|
|
||||||
{
|
|
||||||
Name: "test2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RestartPolicy: api.RestartPolicy{
|
|
||||||
Never: &api.RestartPolicyNever{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(expectedPods, received[0]) {
|
|
||||||
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPodInfo(t *testing.T) {
|
func TestPodInfo(t *testing.T) {
|
||||||
fw := newServerTest()
|
fw := newServerTest()
|
||||||
expected := api.PodInfo{
|
expected := api.PodInfo{
|
||||||
|
@ -194,7 +194,7 @@ func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig)
|
|||||||
// start the kubelet server
|
// start the kubelet server
|
||||||
if kc.EnableServer {
|
if kc.EnableServer {
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
kubelet.ListenAndServeKubeletServer(k, cfg.Channel(kubelet.ServerSource), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
|
kubelet.ListenAndServeKubeletServer(k, net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
|
||||||
}, 0)
|
}, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user