diff --git a/hack/.golint_failures b/hack/.golint_failures index 626ccc0b795..ef8f284c2b5 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -209,10 +209,6 @@ pkg/kubelet/prober/testing pkg/kubelet/qos pkg/kubelet/remote pkg/kubelet/secret -pkg/kubelet/server -pkg/kubelet/server/portforward -pkg/kubelet/server/stats -pkg/kubelet/server/streaming pkg/kubelet/stats pkg/kubelet/status pkg/kubelet/status/testing diff --git a/pkg/kubelet/dockershim/docker_streaming.go b/pkg/kubelet/dockershim/docker_streaming.go index 1c4dc813b08..bc767d063ef 100644 --- a/pkg/kubelet/dockershim/docker_streaming.go +++ b/pkg/kubelet/dockershim/docker_streaming.go @@ -107,7 +107,7 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { if ds.streamingServer == nil { - return nil, streaming.ErrorStreamingDisabled("exec") + return nil, streaming.NewErrorStreamingDisabled("exec") } _, err := checkContainerStatus(ds.client, req.ContainerId) if err != nil { @@ -119,7 +119,7 @@ func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (* // Attach prepares a streaming endpoint to attach to a running container, and returns the address. func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { if ds.streamingServer == nil { - return nil, streaming.ErrorStreamingDisabled("attach") + return nil, streaming.NewErrorStreamingDisabled("attach") } _, err := checkContainerStatus(ds.client, req.ContainerId) if err != nil { @@ -131,7 +131,7 @@ func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { if ds.streamingServer == nil { - return nil, streaming.ErrorStreamingDisabled("port forward") + return nil, streaming.NewErrorStreamingDisabled("port forward") } _, err := checkContainerStatus(ds.client, req.PodSandboxId) if err != nil { diff --git a/pkg/kubelet/metrics/collectors/volume_stats.go b/pkg/kubelet/metrics/collectors/volume_stats.go index c281aaea57f..78e047527c1 100644 --- a/pkg/kubelet/metrics/collectors/volume_stats.go +++ b/pkg/kubelet/metrics/collectors/volume_stats.go @@ -59,11 +59,11 @@ var ( ) type volumeStatsCollector struct { - statsProvider serverstats.StatsProvider + statsProvider serverstats.Provider } // NewVolumeStatsCollector creates a volume stats prometheus collector. -func NewVolumeStatsCollector(statsProvider serverstats.StatsProvider) prometheus.Collector { +func NewVolumeStatsCollector(statsProvider serverstats.Provider) prometheus.Collector { return &volumeStatsCollector{statsProvider: statsProvider} } diff --git a/pkg/kubelet/server/auth.go b/pkg/kubelet/server/auth.go index 8b8805cf5cf..285c3c4c195 100644 --- a/pkg/kubelet/server/auth.go +++ b/pkg/kubelet/server/auth.go @@ -42,6 +42,7 @@ func NewKubeletAuth(authenticator authenticator.Request, authorizerAttributeGett return &KubeletAuth{authenticator, authorizerAttributeGetter, authorizer} } +// NewNodeAuthorizerAttributesGetter creates a new authorizer.RequestAttributesGetter for the node. func NewNodeAuthorizerAttributesGetter(nodeName types.NodeName) authorizer.RequestAttributesGetter { return nodeAuthorizerAttributesGetter{nodeName: nodeName} } diff --git a/pkg/kubelet/server/portforward/constants.go b/pkg/kubelet/server/portforward/constants.go index e7ccd58ae29..62b14f2051a 100644 --- a/pkg/kubelet/server/portforward/constants.go +++ b/pkg/kubelet/server/portforward/constants.go @@ -14,10 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// package portforward contains server-side logic for handling port forwarding requests. +// Package portforward contains server-side logic for handling port forwarding requests. package portforward -// The subprotocol "portforward.k8s.io" is used for port forwarding. +// ProtocolV1Name is the name of the subprotocol used for port forwarding. const ProtocolV1Name = "portforward.k8s.io" +// SupportedProtocols are the supported port forwarding protocols. var SupportedProtocols = []string{ProtocolV1Name} diff --git a/pkg/kubelet/server/portforward/httpstream.go b/pkg/kubelet/server/portforward/httpstream.go index 43393bd57ae..df122ee9368 100644 --- a/pkg/kubelet/server/portforward/httpstream.go +++ b/pkg/kubelet/server/portforward/httpstream.go @@ -33,7 +33,7 @@ import ( "k8s.io/klog" ) -func handleHttpStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { +func handleHTTPStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { _, err := httpstream.Handshake(req, w, supportedPortForwardProtocols) // negotiated protocol isn't currently used server side, but could be in the future if err != nil { diff --git a/pkg/kubelet/server/portforward/httpstream_test.go b/pkg/kubelet/server/portforward/httpstream_test.go index 7f815036b9f..26e6905bbbc 100644 --- a/pkg/kubelet/server/portforward/httpstream_test.go +++ b/pkg/kubelet/server/portforward/httpstream_test.go @@ -63,7 +63,7 @@ func TestHTTPStreamReceived(t *testing.T) { for name, test := range tests { streams := make(chan httpstream.Stream, 1) f := httpStreamReceived(streams) - stream := newFakeHttpStream() + stream := newFakeHTTPStream() if len(test.port) > 0 { stream.headers.Set("port", test.port) } @@ -135,7 +135,7 @@ func TestGetStreamPair(t *testing.T) { } // removed via complete - dataStream := newFakeHttpStream() + dataStream := newFakeHTTPStream() dataStream.headers.Set(api.StreamType, api.StreamTypeData) complete, err := p.add(dataStream) if err != nil { @@ -145,7 +145,7 @@ func TestGetStreamPair(t *testing.T) { t.Fatalf("unexpected complete") } - errorStream := newFakeHttpStream() + errorStream := newFakeHTTPStream() errorStream.headers.Set(api.StreamType, api.StreamTypeError) complete, err = p.add(errorStream) if err != nil { @@ -188,7 +188,7 @@ func TestGetStreamPair(t *testing.T) { func TestRequestID(t *testing.T) { h := &httpStreamHandler{} - s := newFakeHttpStream() + s := newFakeHTTPStream() s.headers.Set(api.StreamType, api.StreamTypeError) s.id = 1 if e, a := "1", h.requestID(s); e != a { @@ -208,39 +208,39 @@ func TestRequestID(t *testing.T) { } } -type fakeHttpStream struct { +type fakeHTTPStream struct { headers http.Header id uint32 } -func newFakeHttpStream() *fakeHttpStream { - return &fakeHttpStream{ +func newFakeHTTPStream() *fakeHTTPStream { + return &fakeHTTPStream{ headers: make(http.Header), } } -var _ httpstream.Stream = &fakeHttpStream{} +var _ httpstream.Stream = &fakeHTTPStream{} -func (s *fakeHttpStream) Read(data []byte) (int, error) { +func (s *fakeHTTPStream) Read(data []byte) (int, error) { return 0, nil } -func (s *fakeHttpStream) Write(data []byte) (int, error) { +func (s *fakeHTTPStream) Write(data []byte) (int, error) { return 0, nil } -func (s *fakeHttpStream) Close() error { +func (s *fakeHTTPStream) Close() error { return nil } -func (s *fakeHttpStream) Reset() error { +func (s *fakeHTTPStream) Reset() error { return nil } -func (s *fakeHttpStream) Headers() http.Header { +func (s *fakeHTTPStream) Headers() http.Header { return s.headers } -func (s *fakeHttpStream) Identifier() uint32 { +func (s *fakeHTTPStream) Identifier() uint32 { return s.id } diff --git a/pkg/kubelet/server/portforward/portforward.go b/pkg/kubelet/server/portforward/portforward.go index 60a96e51a23..905fc8a7822 100644 --- a/pkg/kubelet/server/portforward/portforward.go +++ b/pkg/kubelet/server/portforward/portforward.go @@ -43,7 +43,7 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po if wsstream.IsWebSocketRequest(req) { err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout) } else { - err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) + err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) } if err != nil { diff --git a/pkg/kubelet/server/portforward/websocket.go b/pkg/kubelet/server/portforward/websocket.go index 1b23d74b519..e344f210dbb 100644 --- a/pkg/kubelet/server/portforward/websocket.go +++ b/pkg/kubelet/server/portforward/websocket.go @@ -43,15 +43,15 @@ const ( v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol ) -// options contains details about which streams are required for -// port forwarding. +// V4Options contains details about which streams are required for port +// forwarding. // All fields included in V4Options need to be expressed explicitly in the // CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest. type V4Options struct { Ports []int32 } -// newOptions creates a new options from the Request. +// NewV4Options creates a new options from the Request. func NewV4Options(req *http.Request) (*V4Options, error) { if !wsstream.IsWebSocketRequest(req) { return &V4Options{}, nil diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 9471ff32e35..360b426033c 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -88,6 +88,7 @@ type Server struct { redirectContainerStreaming bool } +// TLSOptions holds the TLS options. type TLSOptions struct { Config *tls.Config CertFile string @@ -165,7 +166,7 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st klog.Fatal(server.ListenAndServe()) } -// ListenAndServePodResources initializes a grpc server to serve the PodResources service +// ListenAndServePodResources initializes a gRPC server to serve the PodResources service func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) { server := grpc.NewServer() podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider)) @@ -186,7 +187,7 @@ type AuthInterface interface { // HostInterface contains all the kubelet methods required by the server. // For testability. type HostInterface interface { - stats.StatsProvider + stats.Provider GetVersionInfo() (*cadvisorapi.VersionInfo, error) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetRunningPods() ([]*v1.Pod, error) @@ -533,7 +534,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re pod, ok := s.host.GetPodByName(podNamespace, podID) if !ok { - response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist\n", podID)) + response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID)) return } // Check if containerName is valid. @@ -553,12 +554,12 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re } } if !containerExists { - response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q\n", containerName, podID)) + response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID)) return } if _, ok := response.ResponseWriter.(http.Flusher); !ok { - response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs\n", reflect.TypeOf(response))) + response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response))) return } fw := flushwriter.Wrap(response.ResponseWriter) @@ -591,7 +592,7 @@ func (s *Server) getPods(request *restful.Request, response *restful.Response) { response.WriteError(http.StatusInternalServerError, err) return } - writeJsonResponse(response, data) + writeJSONResponse(response, data) } // getRunningPods returns a list of pods running on Kubelet. The list is @@ -608,7 +609,7 @@ func (s *Server) getRunningPods(request *restful.Request, response *restful.Resp response.WriteError(http.StatusInternalServerError, err) return } - writeJsonResponse(response, data) + writeJSONResponse(response, data) } // getLogs handles logs requests against the Kubelet. @@ -747,11 +748,11 @@ func (s *Server) getRun(request *restful.Request, response *restful.Response) { response.WriteError(http.StatusInternalServerError, err) return } - writeJsonResponse(response, data) + writeJSONResponse(response, data) } // Derived from go-restful writeJSON. -func writeJsonResponse(response *restful.Response, data []byte) { +func writeJSONResponse(response *restful.Response, data []byte) { if data == nil { response.WriteHeader(http.StatusOK) // do not write a nil representation @@ -834,7 +835,7 @@ func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error return a.host.GetCachedMachineInfo() } -func containerPrometheusLabelsFunc(s stats.StatsProvider) metrics.ContainerLabelsFunc { +func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc { // containerPrometheusLabels maps cAdvisor labels to prometheus labels. return func(c *cadvisorapi.ContainerInfo) map[string]string { // Prometheus requires that all metrics in the same family have the same labels, diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 4f4401f7a99..70410049b3c 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -115,7 +115,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) return fk.machineInfoFunc() } -func (_ *fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) { +func (*fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) { return &cadvisorapi.VersionInfo{}, nil } @@ -249,23 +249,23 @@ func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types } // Unused functions -func (_ *fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil } -func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} } -func (_ *fakeKubelet) GetPodCgroupRoot() string { return "" } -func (_ *fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false } +func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil } +func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} } +func (*fakeKubelet) GetPodCgroupRoot() string { return "" } +func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false } func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) { return map[string]volume.Volume{}, true } -func (_ *fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } -func (_ *fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } -func (_ *fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil } -func (_ *fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil } -func (_ *fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil } -func (_ *fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) { +func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } +func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } +func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil } +func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil } +func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil } +func (*fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) { return nil, nil, nil } -func (_ *fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) { +func (*fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) { return nil, nil } @@ -744,7 +744,7 @@ func TestAuthFilters(t *testing.T) { The kubelet API has likely registered a handler for a new path. If the new path has a use case for partitioned authorization when requested from the kubelet API, add a specific subresource for it in auth.go#GetRequestAttributes() and in TestAuthFilters(). -Otherwise, add it to the expected list of paths that map to the "proxy" subresource in TestAuthFilters().`, path)) +Otherwise, add it to the expected list of paths that map to the "proxy" subresource in TestAuthFilters()`, path)) } } attributesGetter := NewNodeAuthorizerAttributesGetter(types.NodeName("test")) @@ -1553,9 +1553,8 @@ func TestServePortForward(t *testing.T) { if test.redirect { assert.Equal(t, http.StatusFound, resp.StatusCode, "status code") return - } else { - assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code") } + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code") conn, err := upgradeRoundTripper.NewConnection(resp) require.NoError(t, err, "creating streaming connection") diff --git a/pkg/kubelet/server/stats/fs_resource_analyzer.go b/pkg/kubelet/server/stats/fs_resource_analyzer.go index 331a0b27ba6..b09eafc1bb3 100644 --- a/pkg/kubelet/server/stats/fs_resource_analyzer.go +++ b/pkg/kubelet/server/stats/fs_resource_analyzer.go @@ -27,9 +27,7 @@ import ( "k8s.io/klog" ) -// Map to PodVolumeStats pointers since the addresses for map values are not constant and can cause pain -// if we need ever to get a pointer to one of the values (e.g. you can't) -type Cache map[types.UID]*volumeStatCalculator +type statCache map[types.UID]*volumeStatCalculator // fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer type fsResourceAnalyzerInterface interface { @@ -38,7 +36,7 @@ type fsResourceAnalyzerInterface interface { // fsResourceAnalyzer provides stats about fs resource usage type fsResourceAnalyzer struct { - statsProvider StatsProvider + statsProvider Provider calcPeriod time.Duration cachedVolumeStats atomic.Value startOnce sync.Once @@ -47,12 +45,12 @@ type fsResourceAnalyzer struct { var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{} // newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation -func newFsResourceAnalyzer(statsProvider StatsProvider, calcVolumePeriod time.Duration) *fsResourceAnalyzer { +func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer { r := &fsResourceAnalyzer{ statsProvider: statsProvider, calcPeriod: calcVolumePeriod, } - r.cachedVolumeStats.Store(make(Cache)) + r.cachedVolumeStats.Store(make(statCache)) return r } @@ -70,8 +68,8 @@ func (s *fsResourceAnalyzer) Start() { // updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet. func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() { - oldCache := s.cachedVolumeStats.Load().(Cache) - newCache := make(Cache) + oldCache := s.cachedVolumeStats.Load().(statCache) + newCache := make(statCache) // Copy existing entries to new map, creating/starting new entries for pods missing from the cache for _, pod := range s.statsProvider.GetPods() { @@ -96,12 +94,12 @@ func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() { // GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that // is eagerly populated in the background, and never calculated on the fly. func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) { - cache := s.cachedVolumeStats.Load().(Cache) - if statCalc, found := cache[uid]; !found { + cache := s.cachedVolumeStats.Load().(statCache) + statCalc, found := cache[uid] + if !found { // TODO: Differentiate between stats being empty // See issue #20679 return PodVolumeStats{}, false - } else { - return statCalc.GetLatest() } + return statCalc.GetLatest() } diff --git a/pkg/kubelet/server/stats/handler.go b/pkg/kubelet/server/stats/handler.go index f87e5c820c5..aaba690c421 100644 --- a/pkg/kubelet/server/stats/handler.go +++ b/pkg/kubelet/server/stats/handler.go @@ -37,8 +37,8 @@ import ( "k8s.io/kubernetes/pkg/volume" ) -// Host methods required by stats handlers. -type StatsProvider interface { +// Provider hosts methods required by stats handlers. +type Provider interface { // The following stats are provided by either CRI or cAdvisor. // // ListPodStats returns the stats of all the containers managed by pods. @@ -96,11 +96,12 @@ type StatsProvider interface { } type handler struct { - provider StatsProvider + provider Provider summaryProvider SummaryProvider } -func CreateHandlers(rootPath string, provider StatsProvider, summaryProvider SummaryProvider) *restful.WebService { +// CreateHandlers creates the REST handlers for the stats. +func CreateHandlers(rootPath string, provider Provider, summaryProvider SummaryProvider) *restful.WebService { h := &handler{provider, summaryProvider} ws := &restful.WebService{} @@ -130,7 +131,7 @@ func CreateHandlers(rootPath string, provider StatsProvider, summaryProvider Sum return ws } -type StatsRequest struct { +type statsRequest struct { // The name of the container for which to request stats. // Default: / // +optional @@ -158,7 +159,7 @@ type StatsRequest struct { Subcontainers bool `json:"subcontainers,omitempty"` } -func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest { +func (r *statsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest { return &cadvisorapi.ContainerInfoRequest{ NumStats: r.NumStats, Start: r.Start, @@ -166,9 +167,9 @@ func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest { } } -func parseStatsRequest(request *restful.Request) (StatsRequest, error) { +func parseStatsRequest(request *restful.Request) (statsRequest, error) { // Default request. - query := StatsRequest{ + query := statsRequest{ NumStats: 60, } diff --git a/pkg/kubelet/server/stats/resource_analyzer.go b/pkg/kubelet/server/stats/resource_analyzer.go index 5def6ba714d..bcddff0b3c4 100644 --- a/pkg/kubelet/server/stats/resource_analyzer.go +++ b/pkg/kubelet/server/stats/resource_analyzer.go @@ -37,7 +37,7 @@ type resourceAnalyzer struct { var _ ResourceAnalyzer = &resourceAnalyzer{} // NewResourceAnalyzer returns a new ResourceAnalyzer -func NewResourceAnalyzer(statsProvider StatsProvider, calVolumeFrequency time.Duration) ResourceAnalyzer { +func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration) ResourceAnalyzer { fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency) summaryProvider := NewSummaryProvider(statsProvider) return &resourceAnalyzer{fsAnalyzer, summaryProvider} diff --git a/pkg/kubelet/server/stats/summary.go b/pkg/kubelet/server/stats/summary.go index 93aef69ed87..29d3b31a35f 100644 --- a/pkg/kubelet/server/stats/summary.go +++ b/pkg/kubelet/server/stats/summary.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util" ) +// SummaryProvider provides summaries of the stats from Kubelet. type SummaryProvider interface { // Get provides a new Summary with the stats from Kubelet, // and will update some stats if updateStats is true @@ -41,14 +42,14 @@ type summaryProviderImpl struct { // systemBootTime is the time at which the system was started systemBootTime metav1.Time - provider StatsProvider + provider Provider } var _ SummaryProvider = &summaryProviderImpl{} // NewSummaryProvider returns a SummaryProvider using the stats provided by the // specified statsProvider. -func NewSummaryProvider(statsProvider StatsProvider) SummaryProvider { +func NewSummaryProvider(statsProvider Provider) SummaryProvider { kubeletCreationTime := metav1.Now() bootTime, err := util.GetBootTime() if err != nil { diff --git a/pkg/kubelet/server/stats/volume_stat_calculator.go b/pkg/kubelet/server/stats/volume_stat_calculator.go index 220d8785f53..cd8ffdf8df8 100644 --- a/pkg/kubelet/server/stats/volume_stat_calculator.go +++ b/pkg/kubelet/server/stats/volume_stat_calculator.go @@ -32,7 +32,7 @@ import ( // volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result type volumeStatCalculator struct { - statsProvider StatsProvider + statsProvider Provider jitterPeriod time.Duration pod *v1.Pod stopChannel chan struct{} @@ -49,7 +49,7 @@ type PodVolumeStats struct { } // newVolumeStatCalculator creates a new VolumeStatCalculator -func newVolumeStatCalculator(statsProvider StatsProvider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator { +func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator { return &volumeStatCalculator{ statsProvider: statsProvider, jitterPeriod: jitterPeriod, @@ -79,11 +79,11 @@ func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator { // getLatest returns the most recent PodVolumeStats from the cache func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) { - if result := s.latest.Load(); result == nil { + result := s.latest.Load() + if result == nil { return PodVolumeStats{}, false - } else { - return result.(PodVolumeStats), true } + return result.(PodVolumeStats), true } // calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache. @@ -102,8 +102,8 @@ func (s *volumeStatCalculator) calcAndStoreStats() { } // Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats - ephemeralStats := []stats.VolumeStats{} - persistentStats := []stats.VolumeStats{} + var ephemeralStats []stats.VolumeStats + var persistentStats []stats.VolumeStats for name, v := range volumes { metric, err := v.GetMetrics() if err != nil { diff --git a/pkg/kubelet/server/streaming/errors.go b/pkg/kubelet/server/streaming/errors.go index 62bcef79000..bc56f43fc8c 100644 --- a/pkg/kubelet/server/streaming/errors.go +++ b/pkg/kubelet/server/streaming/errors.go @@ -17,7 +17,6 @@ limitations under the License. package streaming import ( - "fmt" "net/http" "strconv" @@ -26,16 +25,17 @@ import ( "google.golang.org/grpc/status" ) -func ErrorStreamingDisabled(method string) error { - return status.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method)) +// NewErrorStreamingDisabled creates an error for disabled streaming method. +func NewErrorStreamingDisabled(method string) error { + return status.Errorf(codes.NotFound, "streaming method %s disabled", method) } -// The error returned when the maximum number of in-flight requests is exceeded. -func ErrorTooManyInFlight() error { - return status.Errorf(codes.ResourceExhausted, "maximum number of in-flight requests exceeded") +// NewErrorTooManyInFlight creates an error for exceeding the maximum number of in-flight requests. +func NewErrorTooManyInFlight() error { + return status.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded") } -// Translates a CRI streaming error into an appropriate HTTP response. +// WriteError translates a CRI streaming error into an appropriate HTTP response. func WriteError(err error, w http.ResponseWriter) error { var status int switch grpc.Code(err) { @@ -43,9 +43,9 @@ func WriteError(err error, w http.ResponseWriter) error { status = http.StatusNotFound case codes.ResourceExhausted: // We only expect to hit this if there is a DoS, so we just wait the full TTL. - // If this is ever hit in steady-state operations, consider increasing the MaxInFlight requests, + // If this is ever hit in steady-state operations, consider increasing the maxInFlight requests, // or plumbing through the time to next expiration. - w.Header().Set("Retry-After", strconv.Itoa(int(CacheTTL.Seconds()))) + w.Header().Set("Retry-After", strconv.Itoa(int(cacheTTL.Seconds()))) status = http.StatusTooManyRequests default: status = http.StatusInternalServerError diff --git a/pkg/kubelet/server/streaming/request_cache.go b/pkg/kubelet/server/streaming/request_cache.go index f68f640be0b..32f9bf58fd6 100644 --- a/pkg/kubelet/server/streaming/request_cache.go +++ b/pkg/kubelet/server/streaming/request_cache.go @@ -29,12 +29,12 @@ import ( ) var ( - // Timeout after which tokens become invalid. - CacheTTL = 1 * time.Minute - // The maximum number of in-flight requests to allow. - MaxInFlight = 1000 - // Length of the random base64 encoded token identifying the request. - TokenLen = 8 + // cacheTTL is the timeout after which tokens become invalid. + cacheTTL = 1 * time.Minute + // maxInFlight is the maximum number of in-flight requests to allow. + maxInFlight = 1000 + // tokenLen is the length of the random base64 encoded token identifying the request. + tokenLen = 8 ) // requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use @@ -77,14 +77,14 @@ func (c *requestCache) Insert(req request) (token string, err error) { // Remove expired entries. c.gc() // If the cache is full, reject the request. - if c.ll.Len() == MaxInFlight { - return "", ErrorTooManyInFlight() + if c.ll.Len() == maxInFlight { + return "", NewErrorTooManyInFlight() } token, err = c.uniqueToken() if err != nil { return "", err } - ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(CacheTTL)}) + ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)}) c.tokens[token] = ele return token, nil @@ -112,15 +112,15 @@ func (c *requestCache) Consume(token string) (req request, found bool) { // uniqueToken generates a random URL-safe token and ensures uniqueness. func (c *requestCache) uniqueToken() (string, error) { const maxTries = 10 - // Number of bytes to be TokenLen when base64 encoded. - tokenSize := math.Ceil(float64(TokenLen) * 6 / 8) + // Number of bytes to be tokenLen when base64 encoded. + tokenSize := math.Ceil(float64(tokenLen) * 6 / 8) rawToken := make([]byte, int(tokenSize)) for i := 0; i < maxTries; i++ { if _, err := rand.Read(rawToken); err != nil { return "", err } encoded := base64.RawURLEncoding.EncodeToString(rawToken) - token := encoded[:TokenLen] + token := encoded[:tokenLen] // If it's unique, return it. Otherwise retry. if _, exists := c.tokens[encoded]; !exists { return token, nil diff --git a/pkg/kubelet/server/streaming/request_cache_test.go b/pkg/kubelet/server/streaming/request_cache_test.go index 6ce432945bb..79edaeff44e 100644 --- a/pkg/kubelet/server/streaming/request_cache_test.go +++ b/pkg/kubelet/server/streaming/request_cache_test.go @@ -35,22 +35,22 @@ func TestInsert(t *testing.T) { // Insert normal oldestTok, err := c.Insert(nextRequest()) require.NoError(t, err) - assert.Len(t, oldestTok, TokenLen) + assert.Len(t, oldestTok, tokenLen) assertCacheSize(t, c, 1) // Insert until full - for i := 0; i < MaxInFlight-2; i++ { + for i := 0; i < maxInFlight-2; i++ { tok, err := c.Insert(nextRequest()) require.NoError(t, err) - assert.Len(t, tok, TokenLen) + assert.Len(t, tok, tokenLen) } - assertCacheSize(t, c, MaxInFlight-1) + assertCacheSize(t, c, maxInFlight-1) newestReq := nextRequest() newestTok, err := c.Insert(newestReq) require.NoError(t, err) - assert.Len(t, newestTok, TokenLen) - assertCacheSize(t, c, MaxInFlight) + assert.Len(t, newestTok, tokenLen) + assertCacheSize(t, c, maxInFlight) require.Contains(t, c.tokens, oldestTok, "oldest request should still be cached") // Consume newest token. @@ -62,8 +62,8 @@ func TestInsert(t *testing.T) { // Insert again (still full) tok, err := c.Insert(nextRequest()) require.NoError(t, err) - assert.Len(t, tok, TokenLen) - assertCacheSize(t, c, MaxInFlight) + assert.Len(t, tok, tokenLen) + assertCacheSize(t, c, maxInFlight) // Insert again (should evict) _, err = c.Insert(nextRequest()) @@ -71,9 +71,9 @@ func TestInsert(t *testing.T) { errResponse := httptest.NewRecorder() require.NoError(t, WriteError(err, errResponse)) assert.Equal(t, errResponse.Code, http.StatusTooManyRequests) - assert.Equal(t, strconv.Itoa(int(CacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After")) + assert.Equal(t, strconv.Itoa(int(cacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After")) - assertCacheSize(t, c, MaxInFlight) + assertCacheSize(t, c, maxInFlight) _, ok = c.Consume(oldestTok) assert.True(t, ok, "oldest request should be valid") } @@ -142,7 +142,7 @@ func TestConsume(t *testing.T) { require.NoError(t, err) assertCacheSize(t, c, 1) - clock.Step(2 * CacheTTL) + clock.Step(2 * cacheTTL) _, ok := c.Consume(tok) assert.False(t, ok) @@ -167,7 +167,7 @@ func TestGC(t *testing.T) { // expired: tok1, tok2 // non-expired: tok3, tok4 - clock.Step(2 * CacheTTL) + clock.Step(2 * cacheTTL) tok3, err := c.Insert(nextRequest()) require.NoError(t, err) assertCacheSize(t, c, 1) @@ -186,14 +186,14 @@ func TestGC(t *testing.T) { assert.True(t, ok) // When full, nothing is expired. - for i := 0; i < MaxInFlight; i++ { + for i := 0; i < maxInFlight; i++ { _, err := c.Insert(nextRequest()) require.NoError(t, err) } - assertCacheSize(t, c, MaxInFlight) + assertCacheSize(t, c, maxInFlight) // When everything is expired - clock.Step(2 * CacheTTL) + clock.Step(2 * cacheTTL) _, err = c.Insert(nextRequest()) require.NoError(t, err) assertCacheSize(t, c, 1) diff --git a/pkg/kubelet/server/streaming/server.go b/pkg/kubelet/server/streaming/server.go index 7cbc424c41e..8bdbda79758 100644 --- a/pkg/kubelet/server/streaming/server.go +++ b/pkg/kubelet/server/streaming/server.go @@ -39,7 +39,7 @@ import ( remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) -// The library interface to serve the stream requests. +// Server is the library interface to serve the stream requests. type Server interface { http.Handler @@ -59,7 +59,7 @@ type Server interface { Stop() error } -// The interface to execute the commands and provide the streams. +// Runtime is the interface to execute the commands and provide the streams. type Runtime interface { Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error @@ -103,6 +103,7 @@ var DefaultConfig = Config{ SupportedPortForwardProtocols: portforward.SupportedProtocols, } +// NewServer creates a new Server for stream requests. // TODO(tallclair): Add auth(n/z) interface & handling. func NewServer(config Config, runtime Runtime) (Server, error) { s := &server{ @@ -243,9 +244,8 @@ func (s *server) Start(stayUp bool) error { s.config.BaseURL.Host = listener.Addr().String() if s.config.TLSConfig != nil { return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig. - } else { - return s.server.Serve(listener) } + return s.server.Serve(listener) } func (s *server) Stop() error {