Fix lint on pkg/kubelet/server/...

This commit is contained in:
Aldo Culquicondor 2019-02-12 11:44:14 -05:00
parent bad9f6341a
commit e61cd68bf3
20 changed files with 123 additions and 125 deletions

View File

@ -212,10 +212,6 @@ pkg/kubelet/prober/testing
pkg/kubelet/qos pkg/kubelet/qos
pkg/kubelet/remote pkg/kubelet/remote
pkg/kubelet/secret pkg/kubelet/secret
pkg/kubelet/server
pkg/kubelet/server/portforward
pkg/kubelet/server/stats
pkg/kubelet/server/streaming
pkg/kubelet/stats pkg/kubelet/stats
pkg/kubelet/status pkg/kubelet/status
pkg/kubelet/status/testing pkg/kubelet/status/testing

View File

@ -107,7 +107,7 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil { if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("exec") return nil, streaming.NewErrorStreamingDisabled("exec")
} }
_, err := checkContainerStatus(ds.client, req.ContainerId) _, err := checkContainerStatus(ds.client, req.ContainerId)
if err != nil { if err != nil {
@ -119,7 +119,7 @@ func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*
// Attach prepares a streaming endpoint to attach to a running container, and returns the address. // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
if ds.streamingServer == nil { if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("attach") return nil, streaming.NewErrorStreamingDisabled("attach")
} }
_, err := checkContainerStatus(ds.client, req.ContainerId) _, err := checkContainerStatus(ds.client, req.ContainerId)
if err != nil { if err != nil {
@ -131,7 +131,7 @@ func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if ds.streamingServer == nil { if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("port forward") return nil, streaming.NewErrorStreamingDisabled("port forward")
} }
_, err := checkContainerStatus(ds.client, req.PodSandboxId) _, err := checkContainerStatus(ds.client, req.PodSandboxId)
if err != nil { if err != nil {

View File

@ -59,11 +59,11 @@ var (
) )
type volumeStatsCollector struct { type volumeStatsCollector struct {
statsProvider serverstats.StatsProvider statsProvider serverstats.Provider
} }
// NewVolumeStatsCollector creates a volume stats prometheus collector. // NewVolumeStatsCollector creates a volume stats prometheus collector.
func NewVolumeStatsCollector(statsProvider serverstats.StatsProvider) prometheus.Collector { func NewVolumeStatsCollector(statsProvider serverstats.Provider) prometheus.Collector {
return &volumeStatsCollector{statsProvider: statsProvider} return &volumeStatsCollector{statsProvider: statsProvider}
} }

View File

@ -42,6 +42,7 @@ func NewKubeletAuth(authenticator authenticator.Request, authorizerAttributeGett
return &KubeletAuth{authenticator, authorizerAttributeGetter, authorizer} return &KubeletAuth{authenticator, authorizerAttributeGetter, authorizer}
} }
// NewNodeAuthorizerAttributesGetter creates a new authorizer.RequestAttributesGetter for the node.
func NewNodeAuthorizerAttributesGetter(nodeName types.NodeName) authorizer.RequestAttributesGetter { func NewNodeAuthorizerAttributesGetter(nodeName types.NodeName) authorizer.RequestAttributesGetter {
return nodeAuthorizerAttributesGetter{nodeName: nodeName} return nodeAuthorizerAttributesGetter{nodeName: nodeName}
} }

View File

@ -14,10 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// package portforward contains server-side logic for handling port forwarding requests. // Package portforward contains server-side logic for handling port forwarding requests.
package portforward package portforward
// The subprotocol "portforward.k8s.io" is used for port forwarding. // ProtocolV1Name is the name of the subprotocol used for port forwarding.
const ProtocolV1Name = "portforward.k8s.io" const ProtocolV1Name = "portforward.k8s.io"
// SupportedProtocols are the supported port forwarding protocols.
var SupportedProtocols = []string{ProtocolV1Name} var SupportedProtocols = []string{ProtocolV1Name}

View File

@ -33,7 +33,7 @@ import (
"k8s.io/klog" "k8s.io/klog"
) )
func handleHttpStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { func handleHTTPStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
_, err := httpstream.Handshake(req, w, supportedPortForwardProtocols) _, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
// negotiated protocol isn't currently used server side, but could be in the future // negotiated protocol isn't currently used server side, but could be in the future
if err != nil { if err != nil {

View File

@ -63,7 +63,7 @@ func TestHTTPStreamReceived(t *testing.T) {
for name, test := range tests { for name, test := range tests {
streams := make(chan httpstream.Stream, 1) streams := make(chan httpstream.Stream, 1)
f := httpStreamReceived(streams) f := httpStreamReceived(streams)
stream := newFakeHttpStream() stream := newFakeHTTPStream()
if len(test.port) > 0 { if len(test.port) > 0 {
stream.headers.Set("port", test.port) stream.headers.Set("port", test.port)
} }
@ -135,7 +135,7 @@ func TestGetStreamPair(t *testing.T) {
} }
// removed via complete // removed via complete
dataStream := newFakeHttpStream() dataStream := newFakeHTTPStream()
dataStream.headers.Set(api.StreamType, api.StreamTypeData) dataStream.headers.Set(api.StreamType, api.StreamTypeData)
complete, err := p.add(dataStream) complete, err := p.add(dataStream)
if err != nil { if err != nil {
@ -145,7 +145,7 @@ func TestGetStreamPair(t *testing.T) {
t.Fatalf("unexpected complete") t.Fatalf("unexpected complete")
} }
errorStream := newFakeHttpStream() errorStream := newFakeHTTPStream()
errorStream.headers.Set(api.StreamType, api.StreamTypeError) errorStream.headers.Set(api.StreamType, api.StreamTypeError)
complete, err = p.add(errorStream) complete, err = p.add(errorStream)
if err != nil { if err != nil {
@ -188,7 +188,7 @@ func TestGetStreamPair(t *testing.T) {
func TestRequestID(t *testing.T) { func TestRequestID(t *testing.T) {
h := &httpStreamHandler{} h := &httpStreamHandler{}
s := newFakeHttpStream() s := newFakeHTTPStream()
s.headers.Set(api.StreamType, api.StreamTypeError) s.headers.Set(api.StreamType, api.StreamTypeError)
s.id = 1 s.id = 1
if e, a := "1", h.requestID(s); e != a { if e, a := "1", h.requestID(s); e != a {
@ -208,39 +208,39 @@ func TestRequestID(t *testing.T) {
} }
} }
type fakeHttpStream struct { type fakeHTTPStream struct {
headers http.Header headers http.Header
id uint32 id uint32
} }
func newFakeHttpStream() *fakeHttpStream { func newFakeHTTPStream() *fakeHTTPStream {
return &fakeHttpStream{ return &fakeHTTPStream{
headers: make(http.Header), headers: make(http.Header),
} }
} }
var _ httpstream.Stream = &fakeHttpStream{} var _ httpstream.Stream = &fakeHTTPStream{}
func (s *fakeHttpStream) Read(data []byte) (int, error) { func (s *fakeHTTPStream) Read(data []byte) (int, error) {
return 0, nil return 0, nil
} }
func (s *fakeHttpStream) Write(data []byte) (int, error) { func (s *fakeHTTPStream) Write(data []byte) (int, error) {
return 0, nil return 0, nil
} }
func (s *fakeHttpStream) Close() error { func (s *fakeHTTPStream) Close() error {
return nil return nil
} }
func (s *fakeHttpStream) Reset() error { func (s *fakeHTTPStream) Reset() error {
return nil return nil
} }
func (s *fakeHttpStream) Headers() http.Header { func (s *fakeHTTPStream) Headers() http.Header {
return s.headers return s.headers
} }
func (s *fakeHttpStream) Identifier() uint32 { func (s *fakeHTTPStream) Identifier() uint32 {
return s.id return s.id
} }

View File

@ -43,7 +43,7 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po
if wsstream.IsWebSocketRequest(req) { if wsstream.IsWebSocketRequest(req) {
err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout) err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
} else { } else {
err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
} }
if err != nil { if err != nil {

View File

@ -43,15 +43,15 @@ const (
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
) )
// options contains details about which streams are required for // V4Options contains details about which streams are required for port
// port forwarding. // forwarding.
// All fields included in V4Options need to be expressed explicitly in the // All fields included in V4Options need to be expressed explicitly in the
// CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest. // CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest.
type V4Options struct { type V4Options struct {
Ports []int32 Ports []int32
} }
// newOptions creates a new options from the Request. // NewV4Options creates a new options from the Request.
func NewV4Options(req *http.Request) (*V4Options, error) { func NewV4Options(req *http.Request) (*V4Options, error) {
if !wsstream.IsWebSocketRequest(req) { if !wsstream.IsWebSocketRequest(req) {
return &V4Options{}, nil return &V4Options{}, nil

View File

@ -88,6 +88,7 @@ type Server struct {
redirectContainerStreaming bool redirectContainerStreaming bool
} }
// TLSOptions holds the TLS options.
type TLSOptions struct { type TLSOptions struct {
Config *tls.Config Config *tls.Config
CertFile string CertFile string
@ -165,7 +166,7 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
klog.Fatal(server.ListenAndServe()) klog.Fatal(server.ListenAndServe())
} }
// ListenAndServePodResources initializes a grpc server to serve the PodResources service // ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) { func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
server := grpc.NewServer() server := grpc.NewServer()
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider)) podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
@ -186,7 +187,7 @@ type AuthInterface interface {
// HostInterface contains all the kubelet methods required by the server. // HostInterface contains all the kubelet methods required by the server.
// For testability. // For testability.
type HostInterface interface { type HostInterface interface {
stats.StatsProvider stats.Provider
GetVersionInfo() (*cadvisorapi.VersionInfo, error) GetVersionInfo() (*cadvisorapi.VersionInfo, error)
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetRunningPods() ([]*v1.Pod, error) GetRunningPods() ([]*v1.Pod, error)
@ -533,7 +534,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
pod, ok := s.host.GetPodByName(podNamespace, podID) pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok { if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist\n", podID)) response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
return return
} }
// Check if containerName is valid. // Check if containerName is valid.
@ -553,12 +554,12 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
} }
} }
if !containerExists { if !containerExists {
response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q\n", containerName, podID)) response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
return return
} }
if _, ok := response.ResponseWriter.(http.Flusher); !ok { if _, ok := response.ResponseWriter.(http.Flusher); !ok {
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs\n", reflect.TypeOf(response))) response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
return return
} }
fw := flushwriter.Wrap(response.ResponseWriter) fw := flushwriter.Wrap(response.ResponseWriter)
@ -591,7 +592,7 @@ func (s *Server) getPods(request *restful.Request, response *restful.Response) {
response.WriteError(http.StatusInternalServerError, err) response.WriteError(http.StatusInternalServerError, err)
return return
} }
writeJsonResponse(response, data) writeJSONResponse(response, data)
} }
// getRunningPods returns a list of pods running on Kubelet. The list is // getRunningPods returns a list of pods running on Kubelet. The list is
@ -608,7 +609,7 @@ func (s *Server) getRunningPods(request *restful.Request, response *restful.Resp
response.WriteError(http.StatusInternalServerError, err) response.WriteError(http.StatusInternalServerError, err)
return return
} }
writeJsonResponse(response, data) writeJSONResponse(response, data)
} }
// getLogs handles logs requests against the Kubelet. // getLogs handles logs requests against the Kubelet.
@ -747,11 +748,11 @@ func (s *Server) getRun(request *restful.Request, response *restful.Response) {
response.WriteError(http.StatusInternalServerError, err) response.WriteError(http.StatusInternalServerError, err)
return return
} }
writeJsonResponse(response, data) writeJSONResponse(response, data)
} }
// Derived from go-restful writeJSON. // Derived from go-restful writeJSON.
func writeJsonResponse(response *restful.Response, data []byte) { func writeJSONResponse(response *restful.Response, data []byte) {
if data == nil { if data == nil {
response.WriteHeader(http.StatusOK) response.WriteHeader(http.StatusOK)
// do not write a nil representation // do not write a nil representation
@ -834,7 +835,7 @@ func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error
return a.host.GetCachedMachineInfo() return a.host.GetCachedMachineInfo()
} }
func containerPrometheusLabelsFunc(s stats.StatsProvider) metrics.ContainerLabelsFunc { func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
// containerPrometheusLabels maps cAdvisor labels to prometheus labels. // containerPrometheusLabels maps cAdvisor labels to prometheus labels.
return func(c *cadvisorapi.ContainerInfo) map[string]string { return func(c *cadvisorapi.ContainerInfo) map[string]string {
// Prometheus requires that all metrics in the same family have the same labels, // Prometheus requires that all metrics in the same family have the same labels,

View File

@ -115,7 +115,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
return fk.machineInfoFunc() return fk.machineInfoFunc()
} }
func (_ *fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) { func (*fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
return &cadvisorapi.VersionInfo{}, nil return &cadvisorapi.VersionInfo{}, nil
} }
@ -249,23 +249,23 @@ func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types
} }
// Unused functions // Unused functions
func (_ *fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil } func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} } func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
func (_ *fakeKubelet) GetPodCgroupRoot() string { return "" } func (*fakeKubelet) GetPodCgroupRoot() string { return "" }
func (_ *fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false } func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) { func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
return map[string]volume.Volume{}, true return map[string]volume.Volume{}, true
} }
func (_ *fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil } func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (_ *fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil } func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil } func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
func (_ *fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) { func (*fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
return nil, nil, nil return nil, nil, nil
} }
func (_ *fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) { func (*fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
return nil, nil return nil, nil
} }
@ -744,7 +744,7 @@ func TestAuthFilters(t *testing.T) {
The kubelet API has likely registered a handler for a new path. The kubelet API has likely registered a handler for a new path.
If the new path has a use case for partitioned authorization when requested from the kubelet API, If the new path has a use case for partitioned authorization when requested from the kubelet API,
add a specific subresource for it in auth.go#GetRequestAttributes() and in TestAuthFilters(). add a specific subresource for it in auth.go#GetRequestAttributes() and in TestAuthFilters().
Otherwise, add it to the expected list of paths that map to the "proxy" subresource in TestAuthFilters().`, path)) Otherwise, add it to the expected list of paths that map to the "proxy" subresource in TestAuthFilters()`, path))
} }
} }
attributesGetter := NewNodeAuthorizerAttributesGetter(types.NodeName("test")) attributesGetter := NewNodeAuthorizerAttributesGetter(types.NodeName("test"))
@ -1553,9 +1553,8 @@ func TestServePortForward(t *testing.T) {
if test.redirect { if test.redirect {
assert.Equal(t, http.StatusFound, resp.StatusCode, "status code") assert.Equal(t, http.StatusFound, resp.StatusCode, "status code")
return return
} else {
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
} }
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
conn, err := upgradeRoundTripper.NewConnection(resp) conn, err := upgradeRoundTripper.NewConnection(resp)
require.NoError(t, err, "creating streaming connection") require.NoError(t, err, "creating streaming connection")

View File

@ -27,9 +27,7 @@ import (
"k8s.io/klog" "k8s.io/klog"
) )
// Map to PodVolumeStats pointers since the addresses for map values are not constant and can cause pain type statCache map[types.UID]*volumeStatCalculator
// if we need ever to get a pointer to one of the values (e.g. you can't)
type Cache map[types.UID]*volumeStatCalculator
// fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer // fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer
type fsResourceAnalyzerInterface interface { type fsResourceAnalyzerInterface interface {
@ -38,7 +36,7 @@ type fsResourceAnalyzerInterface interface {
// fsResourceAnalyzer provides stats about fs resource usage // fsResourceAnalyzer provides stats about fs resource usage
type fsResourceAnalyzer struct { type fsResourceAnalyzer struct {
statsProvider StatsProvider statsProvider Provider
calcPeriod time.Duration calcPeriod time.Duration
cachedVolumeStats atomic.Value cachedVolumeStats atomic.Value
startOnce sync.Once startOnce sync.Once
@ -47,12 +45,12 @@ type fsResourceAnalyzer struct {
var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{} var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
// newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation // newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
func newFsResourceAnalyzer(statsProvider StatsProvider, calcVolumePeriod time.Duration) *fsResourceAnalyzer { func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
r := &fsResourceAnalyzer{ r := &fsResourceAnalyzer{
statsProvider: statsProvider, statsProvider: statsProvider,
calcPeriod: calcVolumePeriod, calcPeriod: calcVolumePeriod,
} }
r.cachedVolumeStats.Store(make(Cache)) r.cachedVolumeStats.Store(make(statCache))
return r return r
} }
@ -70,8 +68,8 @@ func (s *fsResourceAnalyzer) Start() {
// updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet. // updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet.
func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() { func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
oldCache := s.cachedVolumeStats.Load().(Cache) oldCache := s.cachedVolumeStats.Load().(statCache)
newCache := make(Cache) newCache := make(statCache)
// Copy existing entries to new map, creating/starting new entries for pods missing from the cache // Copy existing entries to new map, creating/starting new entries for pods missing from the cache
for _, pod := range s.statsProvider.GetPods() { for _, pod := range s.statsProvider.GetPods() {
@ -96,12 +94,12 @@ func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
// GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that // GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that
// is eagerly populated in the background, and never calculated on the fly. // is eagerly populated in the background, and never calculated on the fly.
func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) { func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) {
cache := s.cachedVolumeStats.Load().(Cache) cache := s.cachedVolumeStats.Load().(statCache)
if statCalc, found := cache[uid]; !found { statCalc, found := cache[uid]
if !found {
// TODO: Differentiate between stats being empty // TODO: Differentiate between stats being empty
// See issue #20679 // See issue #20679
return PodVolumeStats{}, false return PodVolumeStats{}, false
} else {
return statCalc.GetLatest()
} }
return statCalc.GetLatest()
} }

View File

@ -37,8 +37,8 @@ import (
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
// Host methods required by stats handlers. // Provider hosts methods required by stats handlers.
type StatsProvider interface { type Provider interface {
// The following stats are provided by either CRI or cAdvisor. // The following stats are provided by either CRI or cAdvisor.
// //
// ListPodStats returns the stats of all the containers managed by pods. // ListPodStats returns the stats of all the containers managed by pods.
@ -96,11 +96,12 @@ type StatsProvider interface {
} }
type handler struct { type handler struct {
provider StatsProvider provider Provider
summaryProvider SummaryProvider summaryProvider SummaryProvider
} }
func CreateHandlers(rootPath string, provider StatsProvider, summaryProvider SummaryProvider) *restful.WebService { // CreateHandlers creates the REST handlers for the stats.
func CreateHandlers(rootPath string, provider Provider, summaryProvider SummaryProvider) *restful.WebService {
h := &handler{provider, summaryProvider} h := &handler{provider, summaryProvider}
ws := &restful.WebService{} ws := &restful.WebService{}
@ -130,7 +131,7 @@ func CreateHandlers(rootPath string, provider StatsProvider, summaryProvider Sum
return ws return ws
} }
type StatsRequest struct { type statsRequest struct {
// The name of the container for which to request stats. // The name of the container for which to request stats.
// Default: / // Default: /
// +optional // +optional
@ -158,7 +159,7 @@ type StatsRequest struct {
Subcontainers bool `json:"subcontainers,omitempty"` Subcontainers bool `json:"subcontainers,omitempty"`
} }
func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest { func (r *statsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest {
return &cadvisorapi.ContainerInfoRequest{ return &cadvisorapi.ContainerInfoRequest{
NumStats: r.NumStats, NumStats: r.NumStats,
Start: r.Start, Start: r.Start,
@ -166,9 +167,9 @@ func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest {
} }
} }
func parseStatsRequest(request *restful.Request) (StatsRequest, error) { func parseStatsRequest(request *restful.Request) (statsRequest, error) {
// Default request. // Default request.
query := StatsRequest{ query := statsRequest{
NumStats: 60, NumStats: 60,
} }

View File

@ -37,7 +37,7 @@ type resourceAnalyzer struct {
var _ ResourceAnalyzer = &resourceAnalyzer{} var _ ResourceAnalyzer = &resourceAnalyzer{}
// NewResourceAnalyzer returns a new ResourceAnalyzer // NewResourceAnalyzer returns a new ResourceAnalyzer
func NewResourceAnalyzer(statsProvider StatsProvider, calVolumeFrequency time.Duration) ResourceAnalyzer { func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration) ResourceAnalyzer {
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency) fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency)
summaryProvider := NewSummaryProvider(statsProvider) summaryProvider := NewSummaryProvider(statsProvider)
return &resourceAnalyzer{fsAnalyzer, summaryProvider} return &resourceAnalyzer{fsAnalyzer, summaryProvider}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
) )
// SummaryProvider provides summaries of the stats from Kubelet.
type SummaryProvider interface { type SummaryProvider interface {
// Get provides a new Summary with the stats from Kubelet, // Get provides a new Summary with the stats from Kubelet,
// and will update some stats if updateStats is true // and will update some stats if updateStats is true
@ -41,14 +42,14 @@ type summaryProviderImpl struct {
// systemBootTime is the time at which the system was started // systemBootTime is the time at which the system was started
systemBootTime metav1.Time systemBootTime metav1.Time
provider StatsProvider provider Provider
} }
var _ SummaryProvider = &summaryProviderImpl{} var _ SummaryProvider = &summaryProviderImpl{}
// NewSummaryProvider returns a SummaryProvider using the stats provided by the // NewSummaryProvider returns a SummaryProvider using the stats provided by the
// specified statsProvider. // specified statsProvider.
func NewSummaryProvider(statsProvider StatsProvider) SummaryProvider { func NewSummaryProvider(statsProvider Provider) SummaryProvider {
kubeletCreationTime := metav1.Now() kubeletCreationTime := metav1.Now()
bootTime, err := util.GetBootTime() bootTime, err := util.GetBootTime()
if err != nil { if err != nil {

View File

@ -32,7 +32,7 @@ import (
// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result // volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
type volumeStatCalculator struct { type volumeStatCalculator struct {
statsProvider StatsProvider statsProvider Provider
jitterPeriod time.Duration jitterPeriod time.Duration
pod *v1.Pod pod *v1.Pod
stopChannel chan struct{} stopChannel chan struct{}
@ -49,7 +49,7 @@ type PodVolumeStats struct {
} }
// newVolumeStatCalculator creates a new VolumeStatCalculator // newVolumeStatCalculator creates a new VolumeStatCalculator
func newVolumeStatCalculator(statsProvider StatsProvider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator { func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator {
return &volumeStatCalculator{ return &volumeStatCalculator{
statsProvider: statsProvider, statsProvider: statsProvider,
jitterPeriod: jitterPeriod, jitterPeriod: jitterPeriod,
@ -79,11 +79,11 @@ func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator {
// getLatest returns the most recent PodVolumeStats from the cache // getLatest returns the most recent PodVolumeStats from the cache
func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) { func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) {
if result := s.latest.Load(); result == nil { result := s.latest.Load()
if result == nil {
return PodVolumeStats{}, false return PodVolumeStats{}, false
} else {
return result.(PodVolumeStats), true
} }
return result.(PodVolumeStats), true
} }
// calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache. // calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache.
@ -102,8 +102,8 @@ func (s *volumeStatCalculator) calcAndStoreStats() {
} }
// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats // Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
ephemeralStats := []stats.VolumeStats{} var ephemeralStats []stats.VolumeStats
persistentStats := []stats.VolumeStats{} var persistentStats []stats.VolumeStats
for name, v := range volumes { for name, v := range volumes {
metric, err := v.GetMetrics() metric, err := v.GetMetrics()
if err != nil { if err != nil {

View File

@ -17,7 +17,6 @@ limitations under the License.
package streaming package streaming
import ( import (
"fmt"
"net/http" "net/http"
"strconv" "strconv"
@ -26,16 +25,17 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
func ErrorStreamingDisabled(method string) error { // NewErrorStreamingDisabled creates an error for disabled streaming method.
return status.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method)) func NewErrorStreamingDisabled(method string) error {
return status.Errorf(codes.NotFound, "streaming method %s disabled", method)
} }
// The error returned when the maximum number of in-flight requests is exceeded. // NewErrorTooManyInFlight creates an error for exceeding the maximum number of in-flight requests.
func ErrorTooManyInFlight() error { func NewErrorTooManyInFlight() error {
return status.Errorf(codes.ResourceExhausted, "maximum number of in-flight requests exceeded") return status.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
} }
// Translates a CRI streaming error into an appropriate HTTP response. // WriteError translates a CRI streaming error into an appropriate HTTP response.
func WriteError(err error, w http.ResponseWriter) error { func WriteError(err error, w http.ResponseWriter) error {
var status int var status int
switch grpc.Code(err) { switch grpc.Code(err) {
@ -43,9 +43,9 @@ func WriteError(err error, w http.ResponseWriter) error {
status = http.StatusNotFound status = http.StatusNotFound
case codes.ResourceExhausted: case codes.ResourceExhausted:
// We only expect to hit this if there is a DoS, so we just wait the full TTL. // We only expect to hit this if there is a DoS, so we just wait the full TTL.
// If this is ever hit in steady-state operations, consider increasing the MaxInFlight requests, // If this is ever hit in steady-state operations, consider increasing the maxInFlight requests,
// or plumbing through the time to next expiration. // or plumbing through the time to next expiration.
w.Header().Set("Retry-After", strconv.Itoa(int(CacheTTL.Seconds()))) w.Header().Set("Retry-After", strconv.Itoa(int(cacheTTL.Seconds())))
status = http.StatusTooManyRequests status = http.StatusTooManyRequests
default: default:
status = http.StatusInternalServerError status = http.StatusInternalServerError

View File

@ -29,12 +29,12 @@ import (
) )
var ( var (
// Timeout after which tokens become invalid. // cacheTTL is the timeout after which tokens become invalid.
CacheTTL = 1 * time.Minute cacheTTL = 1 * time.Minute
// The maximum number of in-flight requests to allow. // maxInFlight is the maximum number of in-flight requests to allow.
MaxInFlight = 1000 maxInFlight = 1000
// Length of the random base64 encoded token identifying the request. // tokenLen is the length of the random base64 encoded token identifying the request.
TokenLen = 8 tokenLen = 8
) )
// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use // requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
@ -77,14 +77,14 @@ func (c *requestCache) Insert(req request) (token string, err error) {
// Remove expired entries. // Remove expired entries.
c.gc() c.gc()
// If the cache is full, reject the request. // If the cache is full, reject the request.
if c.ll.Len() == MaxInFlight { if c.ll.Len() == maxInFlight {
return "", ErrorTooManyInFlight() return "", NewErrorTooManyInFlight()
} }
token, err = c.uniqueToken() token, err = c.uniqueToken()
if err != nil { if err != nil {
return "", err return "", err
} }
ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(CacheTTL)}) ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)})
c.tokens[token] = ele c.tokens[token] = ele
return token, nil return token, nil
@ -112,15 +112,15 @@ func (c *requestCache) Consume(token string) (req request, found bool) {
// uniqueToken generates a random URL-safe token and ensures uniqueness. // uniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *requestCache) uniqueToken() (string, error) { func (c *requestCache) uniqueToken() (string, error) {
const maxTries = 10 const maxTries = 10
// Number of bytes to be TokenLen when base64 encoded. // Number of bytes to be tokenLen when base64 encoded.
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8) tokenSize := math.Ceil(float64(tokenLen) * 6 / 8)
rawToken := make([]byte, int(tokenSize)) rawToken := make([]byte, int(tokenSize))
for i := 0; i < maxTries; i++ { for i := 0; i < maxTries; i++ {
if _, err := rand.Read(rawToken); err != nil { if _, err := rand.Read(rawToken); err != nil {
return "", err return "", err
} }
encoded := base64.RawURLEncoding.EncodeToString(rawToken) encoded := base64.RawURLEncoding.EncodeToString(rawToken)
token := encoded[:TokenLen] token := encoded[:tokenLen]
// If it's unique, return it. Otherwise retry. // If it's unique, return it. Otherwise retry.
if _, exists := c.tokens[encoded]; !exists { if _, exists := c.tokens[encoded]; !exists {
return token, nil return token, nil

View File

@ -35,22 +35,22 @@ func TestInsert(t *testing.T) {
// Insert normal // Insert normal
oldestTok, err := c.Insert(nextRequest()) oldestTok, err := c.Insert(nextRequest())
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, oldestTok, TokenLen) assert.Len(t, oldestTok, tokenLen)
assertCacheSize(t, c, 1) assertCacheSize(t, c, 1)
// Insert until full // Insert until full
for i := 0; i < MaxInFlight-2; i++ { for i := 0; i < maxInFlight-2; i++ {
tok, err := c.Insert(nextRequest()) tok, err := c.Insert(nextRequest())
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, tok, TokenLen) assert.Len(t, tok, tokenLen)
} }
assertCacheSize(t, c, MaxInFlight-1) assertCacheSize(t, c, maxInFlight-1)
newestReq := nextRequest() newestReq := nextRequest()
newestTok, err := c.Insert(newestReq) newestTok, err := c.Insert(newestReq)
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, newestTok, TokenLen) assert.Len(t, newestTok, tokenLen)
assertCacheSize(t, c, MaxInFlight) assertCacheSize(t, c, maxInFlight)
require.Contains(t, c.tokens, oldestTok, "oldest request should still be cached") require.Contains(t, c.tokens, oldestTok, "oldest request should still be cached")
// Consume newest token. // Consume newest token.
@ -62,8 +62,8 @@ func TestInsert(t *testing.T) {
// Insert again (still full) // Insert again (still full)
tok, err := c.Insert(nextRequest()) tok, err := c.Insert(nextRequest())
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, tok, TokenLen) assert.Len(t, tok, tokenLen)
assertCacheSize(t, c, MaxInFlight) assertCacheSize(t, c, maxInFlight)
// Insert again (should evict) // Insert again (should evict)
_, err = c.Insert(nextRequest()) _, err = c.Insert(nextRequest())
@ -71,9 +71,9 @@ func TestInsert(t *testing.T) {
errResponse := httptest.NewRecorder() errResponse := httptest.NewRecorder()
require.NoError(t, WriteError(err, errResponse)) require.NoError(t, WriteError(err, errResponse))
assert.Equal(t, errResponse.Code, http.StatusTooManyRequests) assert.Equal(t, errResponse.Code, http.StatusTooManyRequests)
assert.Equal(t, strconv.Itoa(int(CacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After")) assert.Equal(t, strconv.Itoa(int(cacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After"))
assertCacheSize(t, c, MaxInFlight) assertCacheSize(t, c, maxInFlight)
_, ok = c.Consume(oldestTok) _, ok = c.Consume(oldestTok)
assert.True(t, ok, "oldest request should be valid") assert.True(t, ok, "oldest request should be valid")
} }
@ -142,7 +142,7 @@ func TestConsume(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assertCacheSize(t, c, 1) assertCacheSize(t, c, 1)
clock.Step(2 * CacheTTL) clock.Step(2 * cacheTTL)
_, ok := c.Consume(tok) _, ok := c.Consume(tok)
assert.False(t, ok) assert.False(t, ok)
@ -167,7 +167,7 @@ func TestGC(t *testing.T) {
// expired: tok1, tok2 // expired: tok1, tok2
// non-expired: tok3, tok4 // non-expired: tok3, tok4
clock.Step(2 * CacheTTL) clock.Step(2 * cacheTTL)
tok3, err := c.Insert(nextRequest()) tok3, err := c.Insert(nextRequest())
require.NoError(t, err) require.NoError(t, err)
assertCacheSize(t, c, 1) assertCacheSize(t, c, 1)
@ -186,14 +186,14 @@ func TestGC(t *testing.T) {
assert.True(t, ok) assert.True(t, ok)
// When full, nothing is expired. // When full, nothing is expired.
for i := 0; i < MaxInFlight; i++ { for i := 0; i < maxInFlight; i++ {
_, err := c.Insert(nextRequest()) _, err := c.Insert(nextRequest())
require.NoError(t, err) require.NoError(t, err)
} }
assertCacheSize(t, c, MaxInFlight) assertCacheSize(t, c, maxInFlight)
// When everything is expired // When everything is expired
clock.Step(2 * CacheTTL) clock.Step(2 * cacheTTL)
_, err = c.Insert(nextRequest()) _, err = c.Insert(nextRequest())
require.NoError(t, err) require.NoError(t, err)
assertCacheSize(t, c, 1) assertCacheSize(t, c, 1)

View File

@ -39,7 +39,7 @@ import (
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
) )
// The library interface to serve the stream requests. // Server is the library interface to serve the stream requests.
type Server interface { type Server interface {
http.Handler http.Handler
@ -59,7 +59,7 @@ type Server interface {
Stop() error Stop() error
} }
// The interface to execute the commands and provide the streams. // Runtime is the interface to execute the commands and provide the streams.
type Runtime interface { type Runtime interface {
Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
@ -103,6 +103,7 @@ var DefaultConfig = Config{
SupportedPortForwardProtocols: portforward.SupportedProtocols, SupportedPortForwardProtocols: portforward.SupportedProtocols,
} }
// NewServer creates a new Server for stream requests.
// TODO(tallclair): Add auth(n/z) interface & handling. // TODO(tallclair): Add auth(n/z) interface & handling.
func NewServer(config Config, runtime Runtime) (Server, error) { func NewServer(config Config, runtime Runtime) (Server, error) {
s := &server{ s := &server{
@ -243,9 +244,8 @@ func (s *server) Start(stayUp bool) error {
s.config.BaseURL.Host = listener.Addr().String() s.config.BaseURL.Host = listener.Addr().String()
if s.config.TLSConfig != nil { if s.config.TLSConfig != nil {
return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig. return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
} else {
return s.server.Serve(listener)
} }
return s.server.Serve(listener)
} }
func (s *server) Stop() error { func (s *server) Stop() error {