mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-20 09:33:52 +00:00
Remove --redirect-container-streaming functionality (#95935)
* Remove --redirect-container-streaming functionality * Update bazel
This commit is contained in:
@@ -89,7 +89,6 @@ go_test(
|
||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
||||
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
|
||||
"//staging/src/k8s.io/kubelet/pkg/apis/stats/v1alpha1:go_default_library",
|
||||
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
||||
|
@@ -112,10 +112,8 @@ func AuthzTestCases() []AuthzTestCase {
|
||||
"/attach/{podNamespace}/{podID}/{uid}/{containerName}": "proxy",
|
||||
"/configz": "proxy",
|
||||
"/containerLogs/{podNamespace}/{podID}/{containerName}": "proxy",
|
||||
"/cri/": "proxy",
|
||||
"/cri/foo": "proxy",
|
||||
"/debug/flags/v": "proxy",
|
||||
"/debug/pprof/{subpath:*}": "proxy",
|
||||
"/debug/flags/v": "proxy",
|
||||
"/debug/pprof/{subpath:*}": "proxy",
|
||||
"/exec/{podNamespace}/{podID}/{containerName}": "proxy",
|
||||
"/exec/{podNamespace}/{podID}/{uid}/{containerName}": "proxy",
|
||||
"/healthz": "proxy",
|
||||
|
@@ -41,7 +41,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -89,13 +89,12 @@ const (
|
||||
|
||||
// Server is a http.Handler which exposes kubelet functionality over HTTP.
|
||||
type Server struct {
|
||||
auth AuthInterface
|
||||
host HostInterface
|
||||
restfulCont containerInterface
|
||||
metricsBuckets sets.String
|
||||
metricsMethodBuckets sets.String
|
||||
resourceAnalyzer stats.ResourceAnalyzer
|
||||
redirectContainerStreaming bool
|
||||
auth AuthInterface
|
||||
host HostInterface
|
||||
restfulCont containerInterface
|
||||
metricsBuckets sets.String
|
||||
metricsMethodBuckets sets.String
|
||||
resourceAnalyzer stats.ResourceAnalyzer
|
||||
}
|
||||
|
||||
// TLSOptions holds the TLS options.
|
||||
@@ -144,11 +143,9 @@ func ListenAndServeKubeletServer(
|
||||
enableCAdvisorJSONEndpoints,
|
||||
enableDebuggingHandlers,
|
||||
enableContentionProfiling,
|
||||
redirectContainerStreaming,
|
||||
enableSystemLogHandler bool,
|
||||
criHandler http.Handler) {
|
||||
enableSystemLogHandler bool) {
|
||||
klog.Infof("Starting to listen on %s:%d", address, port)
|
||||
handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, enableSystemLogHandler, criHandler)
|
||||
handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler)
|
||||
s := &http.Server{
|
||||
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
||||
Handler: &handler,
|
||||
@@ -170,7 +167,7 @@ func ListenAndServeKubeletServer(
|
||||
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
|
||||
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
|
||||
klog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
|
||||
s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, false, false, false, false, nil)
|
||||
s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, false, false, false)
|
||||
|
||||
server := &http.Server{
|
||||
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
||||
@@ -224,24 +221,21 @@ func NewServer(
|
||||
enableCAdvisorJSONEndpoints,
|
||||
enableDebuggingHandlers,
|
||||
enableContentionProfiling,
|
||||
redirectContainerStreaming,
|
||||
enableSystemLogHandler bool,
|
||||
criHandler http.Handler) Server {
|
||||
enableSystemLogHandler bool) Server {
|
||||
server := Server{
|
||||
host: host,
|
||||
resourceAnalyzer: resourceAnalyzer,
|
||||
auth: auth,
|
||||
restfulCont: &filteringContainer{Container: restful.NewContainer()},
|
||||
metricsBuckets: sets.NewString(),
|
||||
metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
|
||||
redirectContainerStreaming: redirectContainerStreaming,
|
||||
host: host,
|
||||
resourceAnalyzer: resourceAnalyzer,
|
||||
auth: auth,
|
||||
restfulCont: &filteringContainer{Container: restful.NewContainer()},
|
||||
metricsBuckets: sets.NewString(),
|
||||
metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
|
||||
}
|
||||
if auth != nil {
|
||||
server.InstallAuthFilter()
|
||||
}
|
||||
server.InstallDefaultHandlers(enableCAdvisorJSONEndpoints)
|
||||
if enableDebuggingHandlers {
|
||||
server.InstallDebuggingHandlers(criHandler)
|
||||
server.InstallDebuggingHandlers()
|
||||
// To maintain backward compatibility serve logs only when enableDebuggingHandlers is also enabled
|
||||
// see https://github.com/kubernetes/kubernetes/pull/87273
|
||||
server.InstallSystemLogHandler(enableSystemLogHandler)
|
||||
@@ -409,7 +403,7 @@ func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
|
||||
const pprofBasePath = "/debug/pprof/"
|
||||
|
||||
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
||||
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
|
||||
func (s *Server) InstallDebuggingHandlers() {
|
||||
klog.Infof("Adding debug handlers to kubelet server.")
|
||||
|
||||
s.addMetricsBucketMatcher("run")
|
||||
@@ -527,11 +521,6 @@ func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
|
||||
To(s.getRunningPods).
|
||||
Operation("getRunningPods"))
|
||||
s.restfulCont.Add(ws)
|
||||
|
||||
s.addMetricsBucketMatcher("cri")
|
||||
if criHandler != nil {
|
||||
s.restfulCont.Handle("/cri/", criHandler)
|
||||
}
|
||||
}
|
||||
|
||||
// InstallDebuggingDisabledHandlers registers the HTTP request patterns that provide better error message
|
||||
@@ -785,10 +774,6 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
|
||||
return
|
||||
}
|
||||
|
||||
if s.redirectContainerStreaming {
|
||||
http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
|
||||
return
|
||||
}
|
||||
proxyStream(response.ResponseWriter, request.Request, url)
|
||||
}
|
||||
|
||||
@@ -813,10 +798,6 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
||||
streaming.WriteError(err, response.ResponseWriter)
|
||||
return
|
||||
}
|
||||
if s.redirectContainerStreaming {
|
||||
http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
|
||||
return
|
||||
}
|
||||
proxyStream(response.ResponseWriter, request.Request, url)
|
||||
}
|
||||
|
||||
@@ -879,10 +860,6 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
|
||||
streaming.WriteError(err, response.ResponseWriter)
|
||||
return
|
||||
}
|
||||
if s.redirectContainerStreaming {
|
||||
http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
|
||||
return
|
||||
}
|
||||
proxyStream(response.ResponseWriter, request.Request, url)
|
||||
}
|
||||
|
||||
|
@@ -39,7 +39,7 @@ import (
|
||||
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
@@ -48,7 +48,6 @@ import (
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
@@ -298,19 +297,17 @@ type serverTestFramework struct {
|
||||
fakeKubelet *fakeKubelet
|
||||
fakeAuth *fakeAuth
|
||||
testHTTPServer *httptest.Server
|
||||
criHandler *utiltesting.FakeHandler
|
||||
}
|
||||
|
||||
func newServerTest() *serverTestFramework {
|
||||
return newServerTestWithDebug(true, false, nil)
|
||||
return newServerTestWithDebug(true, nil)
|
||||
}
|
||||
|
||||
func newServerTestWithDebug(enableDebugging, redirectContainerStreaming bool, streamingServer streaming.Server) *serverTestFramework {
|
||||
return newServerTestWithDebuggingHandlers(enableDebugging, enableDebugging, redirectContainerStreaming, streamingServer)
|
||||
func newServerTestWithDebug(enableDebugging bool, streamingServer streaming.Server) *serverTestFramework {
|
||||
return newServerTestWithDebuggingHandlers(enableDebugging, enableDebugging, streamingServer)
|
||||
}
|
||||
|
||||
func newServerTestWithDebuggingHandlers(enableDebugging, enableSystemLogHandler, redirectContainerStreaming bool,
|
||||
streamingServer streaming.Server) *serverTestFramework {
|
||||
func newServerTestWithDebuggingHandlers(enableDebugging, enableSystemLogHandler bool, streamingServer streaming.Server) *serverTestFramework {
|
||||
fw := &serverTestFramework{}
|
||||
fw.fakeKubelet = &fakeKubelet{
|
||||
hostnameFunc: func() string {
|
||||
@@ -339,9 +336,6 @@ func newServerTestWithDebuggingHandlers(enableDebugging, enableSystemLogHandler,
|
||||
return authorizer.DecisionAllow, "", nil
|
||||
},
|
||||
}
|
||||
fw.criHandler = &utiltesting.FakeHandler{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
server := NewServer(
|
||||
fw.fakeKubelet,
|
||||
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
|
||||
@@ -349,9 +343,7 @@ func newServerTestWithDebuggingHandlers(enableDebugging, enableSystemLogHandler,
|
||||
true,
|
||||
enableDebugging,
|
||||
false,
|
||||
redirectContainerStreaming,
|
||||
enableSystemLogHandler,
|
||||
fw.criHandler)
|
||||
enableSystemLogHandler)
|
||||
fw.serverUnderTest = &server
|
||||
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
|
||||
return fw
|
||||
@@ -1036,7 +1028,7 @@ func TestServeExecInContainerIdleTimeout(t *testing.T) {
|
||||
ss, err := newTestStreamingServer(100 * time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
defer ss.testHTTPServer.Close()
|
||||
fw := newServerTestWithDebug(true, false, ss)
|
||||
fw := newServerTestWithDebug(true, ss)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
podNamespace := "other"
|
||||
@@ -1077,7 +1069,6 @@ func testExecAttach(t *testing.T, verb string) {
|
||||
tty bool
|
||||
responseStatusCode int
|
||||
uid bool
|
||||
redirect bool
|
||||
}{
|
||||
"no input or output": {responseStatusCode: http.StatusBadRequest},
|
||||
"stdin": {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
|
||||
@@ -1086,7 +1077,6 @@ func testExecAttach(t *testing.T, verb string) {
|
||||
"stdout and stderr": {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
|
||||
"stdin stdout and stderr": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
|
||||
"stdin stdout stderr with uid": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols, uid: true},
|
||||
"stdout with redirect": {stdout: true, responseStatusCode: http.StatusFound, redirect: true},
|
||||
}
|
||||
|
||||
for desc := range tests {
|
||||
@@ -1095,7 +1085,7 @@ func testExecAttach(t *testing.T, verb string) {
|
||||
ss, err := newTestStreamingServer(0)
|
||||
require.NoError(t, err)
|
||||
defer ss.testHTTPServer.Close()
|
||||
fw := newServerTestWithDebug(true, test.redirect, ss)
|
||||
fw := newServerTestWithDebug(true, ss)
|
||||
defer fw.testHTTPServer.Close()
|
||||
fmt.Println(desc)
|
||||
|
||||
@@ -1203,16 +1193,8 @@ func testExecAttach(t *testing.T, verb string) {
|
||||
upgradeRoundTripper httpstream.UpgradeRoundTripper
|
||||
c *http.Client
|
||||
)
|
||||
if test.redirect {
|
||||
c = &http.Client{}
|
||||
// Don't follow redirects, since we want to inspect the redirect response.
|
||||
c.CheckRedirect = func(*http.Request, []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
} else {
|
||||
upgradeRoundTripper = spdy.NewRoundTripper(nil, true, true)
|
||||
c = &http.Client{Transport: upgradeRoundTripper}
|
||||
}
|
||||
upgradeRoundTripper = spdy.NewRoundTripper(nil, true, true)
|
||||
c = &http.Client{Transport: upgradeRoundTripper}
|
||||
|
||||
resp, err = c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
|
||||
require.NoError(t, err, "POSTing")
|
||||
@@ -1299,7 +1281,7 @@ func TestServePortForwardIdleTimeout(t *testing.T) {
|
||||
ss, err := newTestStreamingServer(100 * time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
defer ss.testHTTPServer.Close()
|
||||
fw := newServerTestWithDebug(true, false, ss)
|
||||
fw := newServerTestWithDebug(true, ss)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
podNamespace := "other"
|
||||
@@ -1335,7 +1317,6 @@ func TestServePortForward(t *testing.T) {
|
||||
uid bool
|
||||
clientData string
|
||||
containerData string
|
||||
redirect bool
|
||||
shouldError bool
|
||||
}{
|
||||
"no port": {port: "", shouldError: true},
|
||||
@@ -1348,7 +1329,6 @@ func TestServePortForward(t *testing.T) {
|
||||
"normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
|
||||
"max port": {port: "65535", shouldError: false},
|
||||
"normal port with uid": {port: "8000", uid: true, shouldError: false},
|
||||
"normal port with redirect": {port: "8000", redirect: true, shouldError: false},
|
||||
}
|
||||
|
||||
podNamespace := "other"
|
||||
@@ -1360,7 +1340,7 @@ func TestServePortForward(t *testing.T) {
|
||||
ss, err := newTestStreamingServer(0)
|
||||
require.NoError(t, err)
|
||||
defer ss.testHTTPServer.Close()
|
||||
fw := newServerTestWithDebug(true, test.redirect, ss)
|
||||
fw := newServerTestWithDebug(true, ss)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
portForwardFuncDone := make(chan struct{})
|
||||
@@ -1408,26 +1388,14 @@ func TestServePortForward(t *testing.T) {
|
||||
c *http.Client
|
||||
)
|
||||
|
||||
if test.redirect {
|
||||
c = &http.Client{}
|
||||
// Don't follow redirects, since we want to inspect the redirect response.
|
||||
c.CheckRedirect = func(*http.Request, []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
} else {
|
||||
upgradeRoundTripper = spdy.NewRoundTripper(nil, true, true)
|
||||
c = &http.Client{Transport: upgradeRoundTripper}
|
||||
}
|
||||
upgradeRoundTripper = spdy.NewRoundTripper(nil, true, true)
|
||||
c = &http.Client{Transport: upgradeRoundTripper}
|
||||
|
||||
req := makeReq(t, "POST", url, "portforward.k8s.io")
|
||||
resp, err := c.Do(req)
|
||||
require.NoError(t, err, "POSTing")
|
||||
defer resp.Body.Close()
|
||||
|
||||
if test.redirect {
|
||||
assert.Equal(t, http.StatusFound, resp.StatusCode, "status code")
|
||||
return
|
||||
}
|
||||
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
|
||||
|
||||
conn, err := upgradeRoundTripper.NewConnection(resp)
|
||||
@@ -1466,22 +1434,6 @@ func TestServePortForward(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRIHandler(t *testing.T) {
|
||||
fw := newServerTest()
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
const (
|
||||
path = "/cri/exec/123456abcdef"
|
||||
query = "cmd=echo+foo"
|
||||
)
|
||||
resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method)
|
||||
assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path)
|
||||
assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery)
|
||||
}
|
||||
|
||||
func TestMetricBuckets(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
url string
|
||||
@@ -1492,8 +1444,6 @@ func TestMetricBuckets(t *testing.T) {
|
||||
"attach with uid": {url: "/attach/podNamespace/podID/uid/containerName", bucket: "attach"},
|
||||
"configz": {url: "/configz", bucket: "configz"},
|
||||
"containerLogs": {url: "/containerLogs/podNamespace/podID/containerName", bucket: "containerLogs"},
|
||||
"cri": {url: "/cri/", bucket: "cri"},
|
||||
"cri with sub": {url: "/cri/foo", bucket: "cri"},
|
||||
"debug v flags": {url: "/debug/flags/v", bucket: "debug"},
|
||||
"pprof with sub": {url: "/debug/pprof/subpath", bucket: "debug"},
|
||||
"exec": {url: "/exec/podNamespace/podID/containerName", bucket: "exec"},
|
||||
@@ -1556,7 +1506,7 @@ func TestMetricMethodBuckets(t *testing.T) {
|
||||
func TestDebuggingDisabledHandlers(t *testing.T) {
|
||||
// for backward compatibility even if enablesystemLogHandler is set but not enableDebuggingHandler then /logs
|
||||
//shouldn't be served.
|
||||
fw := newServerTestWithDebuggingHandlers(false, true, false, nil)
|
||||
fw := newServerTestWithDebuggingHandlers(false, true, nil)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
paths := []string{
|
||||
@@ -1600,7 +1550,7 @@ func TestDebuggingDisabledHandlers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDisablingSystemLogHandler(t *testing.T) {
|
||||
fw := newServerTestWithDebuggingHandlers(true, false, false, nil)
|
||||
fw := newServerTestWithDebuggingHandlers(true, false, nil)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
// verify logs endpoint is disabled
|
||||
|
@@ -66,7 +66,7 @@ func TestServeWSPortForward(t *testing.T) {
|
||||
ss, err := newTestStreamingServer(0)
|
||||
require.NoError(t, err)
|
||||
defer ss.testHTTPServer.Close()
|
||||
fw := newServerTestWithDebug(true, false, ss)
|
||||
fw := newServerTestWithDebug(true, ss)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
portForwardFuncDone := make(chan struct{})
|
||||
@@ -158,7 +158,7 @@ func TestServeWSMultiplePortForward(t *testing.T) {
|
||||
ss, err := newTestStreamingServer(0)
|
||||
require.NoError(t, err)
|
||||
defer ss.testHTTPServer.Close()
|
||||
fw := newServerTestWithDebug(true, false, ss)
|
||||
fw := newServerTestWithDebug(true, ss)
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
portForwardWG := sync.WaitGroup{}
|
||||
|
Reference in New Issue
Block a user