Merge pull request #19741 from pwittrock/syncfsmetrics

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-05 17:54:22 -08:00
commit fcf9c4a1e4
24 changed files with 2030 additions and 1130 deletions

View File

@ -69,6 +69,7 @@ func NewKubeletServer() *KubeletServer {
KubeletConfiguration: componentconfig.KubeletConfiguration{ KubeletConfiguration: componentconfig.KubeletConfiguration{
Address: "0.0.0.0", Address: "0.0.0.0",
CAdvisorPort: 4194, CAdvisorPort: 4194,
VolumeStatsAggPeriod: unversioned.Duration{time.Minute},
CertDirectory: "/var/run/kubernetes", CertDirectory: "/var/run/kubernetes",
CgroupRoot: "", CgroupRoot: "",
ConfigureCBR0: false, ConfigureCBR0: false,
@ -182,6 +183,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.ImageGCHighThresholdPercent, "image-gc-high-threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%") fs.IntVar(&s.ImageGCHighThresholdPercent, "image-gc-high-threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%")
fs.IntVar(&s.ImageGCLowThresholdPercent, "image-gc-low-threshold", s.ImageGCLowThresholdPercent, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%") fs.IntVar(&s.ImageGCLowThresholdPercent, "image-gc-low-threshold", s.ImageGCLowThresholdPercent, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%")
fs.IntVar(&s.LowDiskSpaceThresholdMB, "low-diskspace-threshold-mb", s.LowDiskSpaceThresholdMB, "The absolute free disk space, in MB, to maintain. When disk space falls below this threshold, new pods would be rejected. Default: 256") fs.IntVar(&s.LowDiskSpaceThresholdMB, "low-diskspace-threshold-mb", s.LowDiskSpaceThresholdMB, "The absolute free disk space, in MB, to maintain. When disk space falls below this threshold, new pods would be rejected. Default: 256")
fs.DurationVar(&s.VolumeStatsAggPeriod.Duration, "volume-stats-agg-period", s.VolumeStatsAggPeriod.Duration, "Specifies interval for kubelet to calculate and cache the volume disk usage for all pods and volumes. To disable volume calculations, set to 0. Default: '1m'")
fs.StringVar(&s.NetworkPluginName, "network-plugin", s.NetworkPluginName, "<Warning: Alpha feature> The name of the network plugin to be invoked for various events in kubelet/pod lifecycle") fs.StringVar(&s.NetworkPluginName, "network-plugin", s.NetworkPluginName, "<Warning: Alpha feature> The name of the network plugin to be invoked for various events in kubelet/pod lifecycle")
fs.StringVar(&s.NetworkPluginDir, "network-plugin-dir", s.NetworkPluginDir, "<Warning: Alpha feature> The full path of the directory in which to search for network plugins") fs.StringVar(&s.NetworkPluginDir, "network-plugin-dir", s.NetworkPluginDir, "<Warning: Alpha feature> The full path of the directory in which to search for network plugins")
fs.StringVar(&s.VolumePluginDir, "volume-plugin-dir", s.VolumePluginDir, "<Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins") fs.StringVar(&s.VolumePluginDir, "volume-plugin-dir", s.VolumePluginDir, "<Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins")

View File

@ -180,6 +180,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
AllowPrivileged: s.AllowPrivileged, AllowPrivileged: s.AllowPrivileged,
Auth: nil, // default does not enforce auth[nz] Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // launches background processes, not set here CAdvisorInterface: nil, // launches background processes, not set here
VolumeStatsAggPeriod: s.VolumeStatsAggPeriod.Duration,
CgroupRoot: s.CgroupRoot, CgroupRoot: s.CgroupRoot,
Cloud: nil, // cloud provider might start background processes Cloud: nil, // cloud provider might start background processes
ClusterDNS: net.ParseIP(s.ClusterDNS), ClusterDNS: net.ParseIP(s.ClusterDNS),
@ -481,6 +482,7 @@ func SimpleKubelet(client *clientset.Clientset,
kcfg := KubeletConfig{ kcfg := KubeletConfig{
Address: net.ParseIP(address), Address: net.ParseIP(address),
CAdvisorInterface: cadvisorInterface, CAdvisorInterface: cadvisorInterface,
VolumeStatsAggPeriod: time.Minute,
CgroupRoot: "", CgroupRoot: "",
Cloud: cloud, Cloud: cloud,
ClusterDNS: clusterDNS, ClusterDNS: clusterDNS,
@ -654,6 +656,7 @@ type KubeletConfig struct {
Auth server.AuthInterface Auth server.AuthInterface
Builder KubeletBuilder Builder KubeletBuilder
CAdvisorInterface cadvisor.Interface CAdvisorInterface cadvisor.Interface
VolumeStatsAggPeriod time.Duration
CgroupRoot string CgroupRoot string
Cloud cloudprovider.Interface Cloud cloudprovider.Interface
ClusterDNS net.IP ClusterDNS net.IP
@ -815,6 +818,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.NodeIP, kc.NodeIP,
kc.Reservation, kc.Reservation,
kc.EnableCustomMetrics, kc.EnableCustomMetrics,
kc.VolumeStatsAggPeriod,
) )
if err != nil { if err != nil {

View File

@ -145,9 +145,10 @@ kubelet
--tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir. --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir.
--tls-private-key-file="": File containing x509 private key matching --tls-cert-file. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file.
--volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins --volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins
--volume-stats-agg-period=1m0s: Specifies interval for kubelet to calculate and cache the volume disk usage for all pods and volumes. To disable volume calculations, set to 0. Default: '1m'
``` ```
###### Auto generated by spf13/cobra on 3-Feb-2016 ###### Auto generated by spf13/cobra on 5-Feb-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -363,6 +363,7 @@ use-kubernetes-cluster-service
user-whitelist user-whitelist
verify-only verify-only
volume-plugin-dir volume-plugin-dir
volume-stats-agg-period
watch-cache watch-cache
watch-only watch-only
whitelist-override-label whitelist-override-label

File diff suppressed because it is too large Load Diff

View File

@ -205,6 +205,8 @@ type KubeletConfiguration struct {
// maintain. When disk space falls below this threshold, new pods would // maintain. When disk space falls below this threshold, new pods would
// be rejected. // be rejected.
LowDiskSpaceThresholdMB int `json:"lowDiskSpaceThresholdMB"` LowDiskSpaceThresholdMB int `json:"lowDiskSpaceThresholdMB"`
// How frequently to calculate and cache volume disk usage for all pods
VolumeStatsAggPeriod unversioned.Duration `json:volumeStatsAggPeriod`
// networkPluginName is the name of the network plugin to be invoked for // networkPluginName is the name of the network plugin to be invoked for
// various events in kubelet/pod lifecycle // various events in kubelet/pod lifecycle
NetworkPluginName string `json:"networkPluginName"` NetworkPluginName string `json:"networkPluginName"`

View File

@ -60,6 +60,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
@ -203,6 +204,7 @@ func NewMainKubelet(
nodeIP net.IP, nodeIP net.IP,
reservation kubetypes.Reservation, reservation kubetypes.Reservation,
enableCustomMetrics bool, enableCustomMetrics bool,
volumeStatsAggPeriod time.Duration,
) (*Kubelet, error) { ) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
@ -329,6 +331,9 @@ func NewMainKubelet(
reservation: reservation, reservation: reservation,
enableCustomMetrics: enableCustomMetrics, enableCustomMetrics: enableCustomMetrics,
} }
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod)
if klet.flannelExperimentalOverlay { if klet.flannelExperimentalOverlay {
glog.Infof("Flannel is in charge of podCIDR and overlay networking.") glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
} }
@ -610,6 +615,9 @@ type Kubelet struct {
// Watcher of out of memory events. // Watcher of out of memory events.
oomWatcher OOMWatcher oomWatcher OOMWatcher
// Monitor resource usage
resourceAnalyzer stats.ResourceAnalyzer
// If non-empty, pass this to the container runtime as the root cgroup. // If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string cgroupRoot string
@ -937,6 +945,9 @@ func (kl *Kubelet) initializeModules() error {
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("Failed to start OOM watcher %v", err) return fmt.Errorf("Failed to start OOM watcher %v", err)
} }
// Step 7: Start resource analyzer
kl.resourceAnalyzer.Start()
return nil return nil
} }
@ -3459,11 +3470,11 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
} }
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) { func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers) server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers)
} }
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, address, port) server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
} }
// GetRuntime returns the current Runtime implementation in use by the kubelet. This func // GetRuntime returns the current Runtime implementation in use by the kubelet. This func

View File

@ -37,6 +37,7 @@ const (
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
MetricsVolumeCalcLatencyKey = "metrics_volume_calc_microseconds"
) )
var ( var (
@ -121,6 +122,13 @@ var (
Help: "Interval in microseconds between relisting in PLEG.", Help: "Interval in microseconds between relisting in PLEG.",
}, },
) )
MetricsVolumeCalcLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: MetricsVolumeCalcLatencyKey,
Help: "Latency in microseconds for calculating volume metrics.",
},
)
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -141,6 +149,7 @@ func Register(containerCache kubecontainer.RuntimeCache) {
prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(newPodAndContainerCollector(containerCache))
prometheus.MustRegister(PLEGRelistLatency) prometheus.MustRegister(PLEGRelistLatency)
prometheus.MustRegister(PLEGRelistInterval) prometheus.MustRegister(PLEGRelistInterval)
prometheus.MustRegister(MetricsVolumeCalcLatency)
}) })
} }

View File

@ -58,13 +58,15 @@ import (
"k8s.io/kubernetes/pkg/util/limitwriter" "k8s.io/kubernetes/pkg/util/limitwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream" "k8s.io/kubernetes/pkg/util/wsstream"
"k8s.io/kubernetes/pkg/volume"
) )
// Server is a http.Handler which exposes kubelet functionality over HTTP. // Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct { type Server struct {
auth AuthInterface auth AuthInterface
host HostInterface host HostInterface
restfulCont containerInterface restfulCont containerInterface
resourceAnalyzer stats.ResourceAnalyzer
} }
type TLSOptions struct { type TLSOptions struct {
@ -102,9 +104,9 @@ func (a *filteringContainer) RegisteredHandlePaths() []string {
} }
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint, tlsOptions *TLSOptions, auth AuthInterface, enableDebuggingHandlers bool) { func ListenAndServeKubeletServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, tlsOptions *TLSOptions, auth AuthInterface, enableDebuggingHandlers bool) {
glog.Infof("Starting to listen on %s:%d", address, port) glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, auth, enableDebuggingHandlers) handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers)
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -119,9 +121,9 @@ func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint,
} }
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, address net.IP, port uint) { func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, nil, false) s := NewServer(host, resourceAnalyzer, nil, false)
server := &http.Server{ server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -162,14 +164,16 @@ type HostInterface interface {
LatestLoopEntryTime() time.Time LatestLoopEntryTime() time.Time
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error) RootFsInfo() (cadvisorapiv2.FsInfo, error)
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
} }
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests. // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(host HostInterface, auth AuthInterface, enableDebuggingHandlers bool) Server { func NewServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, enableDebuggingHandlers bool) Server {
server := Server{ server := Server{
host: host, host: host,
auth: auth, resourceAnalyzer: resourceAnalyzer,
restfulCont: &filteringContainer{Container: restful.NewContainer()}, auth: auth,
restfulCont: &filteringContainer{Container: restful.NewContainer()},
} }
if auth != nil { if auth != nil {
server.InstallAuthFilter() server.InstallAuthFilter()
@ -229,7 +233,7 @@ func (s *Server) InstallDefaultHandlers() {
Operation("getPods")) Operation("getPods"))
s.restfulCont.Add(ws) s.restfulCont.Add(ws)
s.restfulCont.Add(stats.CreateHandlers(s.host)) s.restfulCont.Add(stats.CreateHandlers(s.host, s.resourceAnalyzer))
s.restfulCont.Handle("/metrics", prometheus.Handler()) s.restfulCont.Handle("/metrics", prometheus.Handler())
ws = new(restful.WebService) ws = new(restful.WebService)

View File

@ -41,11 +41,13 @@ import (
"k8s.io/kubernetes/pkg/auth/user" "k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
) )
type fakeKubelet struct { type fakeKubelet struct {
@ -147,6 +149,10 @@ func (_ *fakeKubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
func (_ *fakeKubelet) GetNode() (*api.Node, error) { return nil, nil } func (_ *fakeKubelet) GetNode() (*api.Node, error) { return nil, nil }
func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} } func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
return map[string]volume.Volume{}, true
}
type fakeAuth struct { type fakeAuth struct {
authenticateFunc func(*http.Request) (user.Info, bool, error) authenticateFunc func(*http.Request) (user.Info, bool, error)
attributesFunc func(user.Info, *http.Request) authorizer.Attributes attributesFunc func(user.Info, *http.Request) authorizer.Attributes
@ -196,7 +202,11 @@ func newServerTest() *serverTestFramework {
return nil return nil
}, },
} }
server := NewServer(fw.fakeKubelet, fw.fakeAuth, true) server := NewServer(
fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
fw.fakeAuth,
true)
fw.serverUnderTest = &server fw.serverUnderTest = &server
// TODO: Close() this when fix #19254 // TODO: Close() this when fix #19254
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)

View File

@ -0,0 +1,153 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
)
// Map to PodVolumeStats pointers since the addresses for map values are not constant and can cause pain
// if we need ever to get a pointer to one of the values (e.g. you can't)
type Cache map[types.UID]*PodVolumeStats
// PodVolumeStats encapsulates all VolumeStats for a pod
type PodVolumeStats struct {
Volumes []VolumeStats
}
// fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer
type fsResourceAnalyzerInterface interface {
GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool)
}
// diskResourceAnalyzer provider stats about fs resource usage
type fsResourceAnalyzer struct {
statsProvider StatsProvider
calcVolumePeriod time.Duration
cachedVolumeStats atomic.Value
}
var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
// newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
func newFsResourceAnalyzer(statsProvider StatsProvider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
return &fsResourceAnalyzer{
statsProvider: statsProvider,
calcVolumePeriod: calcVolumePeriod,
}
}
// Start eager background caching of volume stats.
func (s *fsResourceAnalyzer) Start() {
if s.calcVolumePeriod <= 0 {
glog.Info("Volume stats collection disabled.")
return
}
glog.Info("Starting FS ResourceAnalyzer")
go util.Forever(func() {
startTime := time.Now()
s.updateCachedPodVolumeStats()
glog.V(3).Infof("Finished calculating volume stats in %v.", time.Now().Sub(startTime))
metrics.MetricsVolumeCalcLatency.Observe(metrics.SinceInMicroseconds(startTime))
}, s.calcVolumePeriod)
}
// updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet.
func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
// Calculate the new volume stats map
pods := s.statsProvider.GetPods()
newCache := make(Cache)
// TODO: Prevent 1 pod metrics hanging from blocking other pods. Schedule pods independently and spaced
// evenly across the period to prevent cpu spikes. Ideally resource collection consumes the resources
// allocated to the pod itself to isolate bad actors.
// See issue #20675
for _, pod := range pods {
podUid := pod.GetUID()
stats, found := s.getPodVolumeStats(pod)
if !found {
glog.Warningf("Could not locate volumes for pod %s", format.Pod(pod))
continue
}
newCache[podUid] = &stats
}
// Update the cache reference
s.cachedVolumeStats.Store(newCache)
}
// getPodVolumeStats calculates PodVolumeStats for a given pod and returns the result.
func (s *fsResourceAnalyzer) getPodVolumeStats(pod *api.Pod) (PodVolumeStats, bool) {
// Find all Volumes for the Pod
volumes, found := s.statsProvider.ListVolumesForPod(pod.UID)
if !found {
return PodVolumeStats{}, found
}
// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
stats := make([]VolumeStats, 0, len(volumes))
for name, v := range volumes {
metric, err := v.GetMetrics()
if err != nil {
// Expected for Volumes that don't support Metrics
// TODO: Disambiguate unsupported from errors
// See issue #20676
glog.V(4).Infof("Failed to calculate volume metrics for pod %s volume %s: %+v",
format.Pod(pod), name, err)
continue
}
stats = append(stats, s.parsePodVolumeStats(name, metric))
}
return PodVolumeStats{Volumes: stats}, true
}
func (s *fsResourceAnalyzer) parsePodVolumeStats(podName string, metric *volume.Metrics) VolumeStats {
available := uint64(metric.Available.Value())
capacity := uint64(metric.Capacity.Value())
used := uint64((metric.Used.Value()))
return VolumeStats{
Name: podName,
FsStats: FsStats{
AvailableBytes: &available,
CapacityBytes: &capacity,
UsedBytes: &used}}
}
// GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that
// is eagerly populated in the background, and never calculated on the fly.
func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) {
// Cache hasn't been initialized yet
if s.cachedVolumeStats.Load() == nil {
return PodVolumeStats{}, false
}
cache := s.cachedVolumeStats.Load().(Cache)
stats, f := cache[uid]
if !f {
// TODO: Differentiate between stats being empty
// See issue #20679
return PodVolumeStats{}, false
}
return *stats, true
}

View File

@ -0,0 +1,178 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"github.com/stretchr/testify/assert"
)
// TestGetPodVolumeStats tests that GetPodVolumeStats reads from the cache and returns the value
func TestGetPodVolumeStats(t *testing.T) {
instance := newFsResourceAnalyzer(&MockStatsProvider{}, time.Minute*5)
stats, found := instance.GetPodVolumeStats("testpod1")
assert.False(t, found)
assert.Equal(t, PodVolumeStats{}, stats)
instance.cachedVolumeStats.Store(make(Cache))
stats, found = instance.GetPodVolumeStats("testpod1")
assert.False(t, found)
assert.Equal(t, PodVolumeStats{}, stats)
available := uint64(100)
used := uint64(200)
capacity := uint64(400)
vs1 := VolumeStats{
Name: "vol1",
FsStats: FsStats{
AvailableBytes: &available,
UsedBytes: &used,
CapacityBytes: &capacity,
},
}
pvs := &PodVolumeStats{
Volumes: []VolumeStats{vs1},
}
instance.cachedVolumeStats.Load().(Cache)["testpod1"] = pvs
stats, found = instance.GetPodVolumeStats("testpod1")
assert.True(t, found)
assert.Equal(t, *pvs, stats)
}
// TestUpdateCachedPodVolumeStats tests that the cache is updated from the stats provider
func TestUpdateCachedPodVolumeStats(t *testing.T) {
statsPr := &MockStatsProvider{}
instance := newFsResourceAnalyzer(statsPr, time.Minute*5)
// Mock retrieving pods
pods := []*api.Pod{
{ObjectMeta: api.ObjectMeta{UID: "testpod1"}},
{ObjectMeta: api.ObjectMeta{UID: "testpod2"}},
}
statsPr.On("GetPods").Return(pods)
// Mock volumes for pod1
m1 := &volume.Metrics{
Available: resource.NewQuantity(100, resource.DecimalSI),
Used: resource.NewQuantity(200, resource.DecimalSI),
Capacity: resource.NewQuantity(400, resource.DecimalSI),
}
v1 := &volume.MockVolume{}
v1.On("GetMetrics").Return(m1, nil)
m2 := &volume.Metrics{
Available: resource.NewQuantity(600, resource.DecimalSI),
Used: resource.NewQuantity(700, resource.DecimalSI),
Capacity: resource.NewQuantity(1400, resource.DecimalSI),
}
v2 := &volume.MockVolume{}
v2.On("GetMetrics").Return(m2, nil)
tp1Volumes := map[string]volume.Volume{
"v1": v1,
"v2": v2,
}
statsPr.On("ListVolumesForPod", types.UID("testpod1")).Return(tp1Volumes, true)
// Mock volumes for pod2
m3 := &volume.Metrics{
Available: resource.NewQuantity(800, resource.DecimalSI),
Used: resource.NewQuantity(900, resource.DecimalSI),
Capacity: resource.NewQuantity(1800, resource.DecimalSI),
}
v3 := &volume.MockVolume{}
v3.On("GetMetrics").Return(m3, nil)
v4 := &volume.MockVolume{}
v4.On("GetMetrics").Return(nil, fmt.Errorf("Error calculating stats"))
tp2Volumes := map[string]volume.Volume{
"v3": v3,
"v4": v4,
}
statsPr.On("ListVolumesForPod", types.UID("testpod2")).Return(tp2Volumes, true)
instance.updateCachedPodVolumeStats()
actual1, found := instance.GetPodVolumeStats("testpod1")
assert.True(t, found)
assert.Len(t, actual1.Volumes, 2)
v1available := uint64(100)
v1used := uint64(200)
v1capacity := uint64(400)
assert.Contains(t, actual1.Volumes, VolumeStats{
Name: "v1",
FsStats: FsStats{
AvailableBytes: &v1available,
UsedBytes: &v1used,
CapacityBytes: &v1capacity,
},
})
v2available := uint64(600)
v2used := uint64(700)
v2capacity := uint64(1400)
assert.Contains(t, actual1.Volumes, VolumeStats{
Name: "v2",
FsStats: FsStats{
AvailableBytes: &v2available,
UsedBytes: &v2used,
CapacityBytes: &v2capacity,
},
})
v3available := uint64(800)
v3used := uint64(900)
v3capacity := uint64(1800)
actual2, found := instance.GetPodVolumeStats("testpod2")
assert.True(t, found)
assert.Len(t, actual2.Volumes, 1)
assert.Contains(t, actual2.Volumes, VolumeStats{
Name: "v3",
FsStats: FsStats{
AvailableBytes: &v3available,
UsedBytes: &v3used,
CapacityBytes: &v3capacity,
},
})
// Make sure the cache gets updated. The mocking libraries have trouble
pods = []*api.Pod{
{ObjectMeta: api.ObjectMeta{UID: "testpod3"}},
}
statsPr.On("GetPods").Return(pods)
// pod3 volumes
m1 = &volume.Metrics{
Available: resource.NewQuantity(150, resource.DecimalSI),
Used: resource.NewQuantity(200, resource.DecimalSI),
Capacity: resource.NewQuantity(600, resource.DecimalSI),
}
v1 = &volume.MockVolume{}
v1.On("GetMetrics").Return(m1, nil)
tp1Volumes = map[string]volume.Volume{
"v1": v1,
}
statsPr.On("ListVolumesForPod", types.UID("testpod3")).Return(tp1Volumes, true)
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/volume"
) )
// Host methods required by stats handlers. // Host methods required by stats handlers.
@ -45,6 +46,8 @@ type StatsProvider interface {
GetNodeConfig() cm.NodeConfig GetNodeConfig() cm.NodeConfig
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error) RootFsInfo() (cadvisorapiv2.FsInfo, error)
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
GetPods() []*api.Pod
} }
type handler struct { type handler struct {
@ -52,8 +55,8 @@ type handler struct {
summaryProvider SummaryProvider summaryProvider SummaryProvider
} }
func CreateHandlers(provider StatsProvider) *restful.WebService { func CreateHandlers(provider StatsProvider, resourceAnalyzer ResourceAnalyzer) *restful.WebService {
h := &handler{provider, NewSummaryProvider(provider)} h := &handler{provider, NewSummaryProvider(provider, resourceAnalyzer)}
ws := &restful.WebService{} ws := &restful.WebService{}
ws.Path("/stats/"). ws.Path("/stats/").

View File

@ -0,0 +1,244 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import "github.com/stretchr/testify/mock"
import cadvisorapi "github.com/google/cadvisor/info/v1"
import cadvisorapiv2 "github.com/google/cadvisor/info/v2"
import "k8s.io/kubernetes/pkg/api"
import "k8s.io/kubernetes/pkg/kubelet/cm"
import "k8s.io/kubernetes/pkg/types"
import "k8s.io/kubernetes/pkg/volume"
// DO NOT EDIT
// GENERATED BY mockery
type MockStatsProvider struct {
mock.Mock
}
// GetContainerInfo provides a mock function with given fields: podFullName, uid, containerName, req
func (_m *MockStatsProvider) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
ret := _m.Called(podFullName, uid, containerName, req)
var r0 *cadvisorapi.ContainerInfo
if rf, ok := ret.Get(0).(func(string, types.UID, string, *cadvisorapi.ContainerInfoRequest) *cadvisorapi.ContainerInfo); ok {
r0 = rf(podFullName, uid, containerName, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*cadvisorapi.ContainerInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, types.UID, string, *cadvisorapi.ContainerInfoRequest) error); ok {
r1 = rf(podFullName, uid, containerName, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetContainerInfoV2 provides a mock function with given fields: name, options
func (_m *MockStatsProvider) GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
ret := _m.Called(name, options)
var r0 map[string]cadvisorapiv2.ContainerInfo
if rf, ok := ret.Get(0).(func(string, cadvisorapiv2.RequestOptions) map[string]cadvisorapiv2.ContainerInfo); ok {
r0 = rf(name, options)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]cadvisorapiv2.ContainerInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, cadvisorapiv2.RequestOptions) error); ok {
r1 = rf(name, options)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetRawContainerInfo provides a mock function with given fields: containerName, req, subcontainers
func (_m *MockStatsProvider) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
ret := _m.Called(containerName, req, subcontainers)
var r0 map[string]*cadvisorapi.ContainerInfo
if rf, ok := ret.Get(0).(func(string, *cadvisorapi.ContainerInfoRequest, bool) map[string]*cadvisorapi.ContainerInfo); ok {
r0 = rf(containerName, req, subcontainers)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]*cadvisorapi.ContainerInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, *cadvisorapi.ContainerInfoRequest, bool) error); ok {
r1 = rf(containerName, req, subcontainers)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetPodByName provides a mock function with given fields: namespace, name
func (_m *MockStatsProvider) GetPodByName(namespace string, name string) (*api.Pod, bool) {
ret := _m.Called(namespace, name)
var r0 *api.Pod
if rf, ok := ret.Get(0).(func(string, string) *api.Pod); ok {
r0 = rf(namespace, name)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*api.Pod)
}
}
var r1 bool
if rf, ok := ret.Get(1).(func(string, string) bool); ok {
r1 = rf(namespace, name)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// GetNode provides a mock function with given fields:
func (_m *MockStatsProvider) GetNode() (*api.Node, error) {
ret := _m.Called()
var r0 *api.Node
if rf, ok := ret.Get(0).(func() *api.Node); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*api.Node)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetNodeConfig provides a mock function with given fields:
func (_m *MockStatsProvider) GetNodeConfig() cm.NodeConfig {
ret := _m.Called()
var r0 cm.NodeConfig
if rf, ok := ret.Get(0).(func() cm.NodeConfig); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(cm.NodeConfig)
}
return r0
}
// DockerImagesFsInfo provides a mock function with given fields:
func (_m *MockStatsProvider) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) {
ret := _m.Called()
var r0 cadvisorapiv2.FsInfo
if rf, ok := ret.Get(0).(func() cadvisorapiv2.FsInfo); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(cadvisorapiv2.FsInfo)
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RootFsInfo provides a mock function with given fields:
func (_m *MockStatsProvider) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
ret := _m.Called()
var r0 cadvisorapiv2.FsInfo
if rf, ok := ret.Get(0).(func() cadvisorapiv2.FsInfo); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(cadvisorapiv2.FsInfo)
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ListVolumesForPod provides a mock function with given fields: podUID
func (_m *MockStatsProvider) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
ret := _m.Called(podUID)
var r0 map[string]volume.Volume
if rf, ok := ret.Get(0).(func(types.UID) map[string]volume.Volume); ok {
r0 = rf(podUID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]volume.Volume)
}
}
var r1 bool
if rf, ok := ret.Get(1).(func(types.UID) bool); ok {
r1 = rf(podUID)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// GetPods provides a mock function with given fields:
func (_m *MockStatsProvider) GetPods() []*api.Pod {
ret := _m.Called()
var r0 []*api.Pod
if rf, ok := ret.Get(0).(func() []*api.Pod); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*api.Pod)
}
}
return r0
}

View File

@ -0,0 +1,43 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import "time"
// ResourceAnalyzer provides statistics on node resource consumption
type ResourceAnalyzer interface {
Start()
fsResourceAnalyzerInterface
}
// resourceAnalyzer implements ResourceAnalyzer
type resourceAnalyzer struct {
*fsResourceAnalyzer
}
var _ ResourceAnalyzer = &resourceAnalyzer{}
// NewResourceAnalyzer returns a new ResourceAnalyzer
func NewResourceAnalyzer(statsProvider StatsProvider, calVolumeFrequency time.Duration) ResourceAnalyzer {
return &resourceAnalyzer{newFsResourceAnalyzer(statsProvider, calVolumeFrequency)}
}
// Start starts background functions necessary for the ResourceAnalyzer to function
func (ra *resourceAnalyzer) Start() {
ra.fsResourceAnalyzer.Start()
}

View File

@ -18,16 +18,19 @@ package stats
import ( import (
"fmt" "fmt"
"runtime"
"time" "time"
"github.com/golang/glog"
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/leaky" "k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/types"
"github.com/golang/glog"
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
) )
type SummaryProvider interface { type SummaryProvider interface {
@ -36,14 +39,17 @@ type SummaryProvider interface {
} }
type summaryProviderImpl struct { type summaryProviderImpl struct {
provider StatsProvider provider StatsProvider
resourceAnalyzer ResourceAnalyzer
} }
var _ SummaryProvider = &summaryProviderImpl{} var _ SummaryProvider = &summaryProviderImpl{}
// NewSummaryProvider returns a new SummaryProvider // NewSummaryProvider returns a new SummaryProvider
func NewSummaryProvider(statsProvider StatsProvider) SummaryProvider { func NewSummaryProvider(statsProvider StatsProvider, resourceAnalyzer ResourceAnalyzer) SummaryProvider {
return &summaryProviderImpl{statsProvider} stackBuff := []byte{}
runtime.Stack(stackBuff, false)
return &summaryProviderImpl{statsProvider, resourceAnalyzer}
} }
// Get implements the SummaryProvider interface // Get implements the SummaryProvider interface
@ -74,17 +80,18 @@ func (sp *summaryProviderImpl) Get() (*Summary, error) {
return nil, err return nil, err
} }
sb := &summaryBuilder{node, nodeConfig, rootFsInfo, imageFsInfo, infos} sb := &summaryBuilder{sp.resourceAnalyzer, node, nodeConfig, rootFsInfo, imageFsInfo, infos}
return sb.build() return sb.build()
} }
// summaryBuilder aggregates the datastructures provided by cadvisor into a Summary result // summaryBuilder aggregates the datastructures provided by cadvisor into a Summary result
type summaryBuilder struct { type summaryBuilder struct {
node *api.Node resourceAnalyzer ResourceAnalyzer
nodeConfig cm.NodeConfig node *api.Node
rootFsInfo cadvisorapiv2.FsInfo nodeConfig cm.NodeConfig
imageFsInfo cadvisorapiv2.FsInfo rootFsInfo cadvisorapiv2.FsInfo
infos map[string]cadvisorapiv2.ContainerInfo imageFsInfo cadvisorapiv2.FsInfo
infos map[string]cadvisorapiv2.ContainerInfo
} }
// build returns a Summary from aggregating the input data // build returns a Summary from aggregating the input data
@ -153,7 +160,8 @@ func (sb *summaryBuilder) containerInfoV2FsStats(
} }
cfs := lcs.Filesystem cfs := lcs.Filesystem
if cfs != nil && cfs.BaseUsageBytes != nil { if cfs != nil && cfs.BaseUsageBytes != nil {
cs.Rootfs.UsedBytes = cfs.BaseUsageBytes rootfsUsage := *cfs.BaseUsageBytes
cs.Rootfs.UsedBytes = &rootfsUsage
if cfs.TotalUsageBytes != nil { if cfs.TotalUsageBytes != nil {
logsUsage := *cfs.TotalUsageBytes - *cfs.BaseUsageBytes logsUsage := *cfs.TotalUsageBytes - *cfs.BaseUsageBytes
cs.Logs.UsedBytes = &logsUsage cs.Logs.UsedBytes = &logsUsage
@ -207,6 +215,11 @@ func (sb *summaryBuilder) buildSummaryPods() []PodStats {
// Add each PodStats to the result // Add each PodStats to the result
result := make([]PodStats, 0, len(podToStats)) result := make([]PodStats, 0, len(podToStats))
for _, stats := range podToStats { for _, stats := range podToStats {
// Lookup the volume stats for each pod
podUID := types.UID(stats.PodRef.UID)
if vstats, found := sb.resourceAnalyzer.GetPodVolumeStats(podUID); found {
stats.VolumeStats = vstats.Volumes
}
result = append(result, *stats) result = append(result, *stats)
} }
return result return result

View File

@ -104,7 +104,8 @@ func TestBuildSummary(t *testing.T) {
rootfs := v2.FsInfo{} rootfs := v2.FsInfo{}
imagefs := v2.FsInfo{} imagefs := v2.FsInfo{}
sb := &summaryBuilder{&node, nodeConfig, rootfs, imagefs, infos} sb := &summaryBuilder{
newFsResourceAnalyzer(&MockStatsProvider{}, time.Minute*5), &node, nodeConfig, rootfs, imagefs, infos}
summary, err := sb.build() summary, err := sb.build()
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -166,6 +166,19 @@ type volumeTuple struct {
Name string Name string
} }
// ListVolumesForPod returns a map of the volumes associated with the given pod
func (kl *Kubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
result := map[string]volume.Volume{}
vm, ok := kl.volumeManager.GetVolumes(podUID)
if !ok {
return result, false
}
for name, info := range vm {
result[name] = info.Builder
}
return result, true
}
func (kl *Kubelet) getPodVolumes(podUID types.UID) ([]*volumeTuple, error) { func (kl *Kubelet) getPodVolumes(podUID types.UID) ([]*volumeTuple, error) {
var volumes []*volumeTuple var volumes []*volumeTuple
podVolDir := kl.getPodVolumesDir(podUID) podVolDir := kl.getPodVolumesDir(podUID)

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volume
import (
"sync"
"sync/atomic"
)
var _ MetricsProvider = &cachedMetrics{}
// cachedMetrics represents a MetricsProvider that wraps another provider and caches the result.
type cachedMetrics struct {
wrapped MetricsProvider
resultError error
resultMetrics *Metrics
once cacheOnce
}
// NewCachedMetrics creates a new cachedMetrics wrapping another MetricsProvider and caching the results.
func NewCachedMetrics(provider MetricsProvider) MetricsProvider {
return &cachedMetrics{wrapped: provider}
}
// See MetricsProvider.GetMetrics
// Runs GetMetrics Once and caches the result. Will not cache result if there is an error.
func (md *cachedMetrics) GetMetrics() (*Metrics, error) {
md.once.cache(func() error {
md.resultMetrics, md.resultError = md.wrapped.GetMetrics()
return md.resultError
})
return md.resultMetrics, md.resultError
}
// Copied from sync.Once but we don't want to cache the results if there is an error
type cacheOnce struct {
m sync.Mutex
done uint32
}
// Copied from sync.Once but we don't want to cache the results if there is an error
func (o *cacheOnce) cache(f func() error) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
err := f()
if err == nil {
atomic.StoreUint32(&o.done, 1)
}
}
}

62
pkg/volume/mock_volume.go Normal file
View File

@ -0,0 +1,62 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volume
import "github.com/stretchr/testify/mock"
// ORIGINALLY GENERATED BY mockery with hand edits
type MockVolume struct {
mock.Mock
}
// GetPath provides a mock function with given fields:
func (_m *MockVolume) GetPath() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// GetMetrics provides a mock function with given fields:
func (_m *MockVolume) GetMetrics() (*Metrics, error) {
ret := _m.Called()
var r0 *Metrics
if rf, ok := ret.Get(0).(func() *Metrics); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*Metrics)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -72,7 +72,7 @@ func (plugin *secretPlugin) NewBuilder(spec *volume.Spec, pod *api.Pod, opts vol
plugin, plugin,
plugin.host.GetMounter(), plugin.host.GetMounter(),
plugin.host.GetWriter(), plugin.host.GetWriter(),
volume.MetricsNil{}, volume.NewCachedMetrics(volume.NewMetricsDu(getPathFromHost(plugin.host, pod.UID, spec.Name()))),
}, },
secretName: spec.Volume.Secret.SecretName, secretName: spec.Volume.Secret.SecretName,
pod: *pod, pod: *pod,
@ -88,7 +88,7 @@ func (plugin *secretPlugin) NewCleaner(volName string, podUID types.UID) (volume
plugin, plugin,
plugin.host.GetMounter(), plugin.host.GetMounter(),
plugin.host.GetWriter(), plugin.host.GetWriter(),
volume.MetricsNil{}, volume.NewCachedMetrics(volume.NewMetricsDu(getPathFromHost(plugin.host, podUID, volName))),
}, },
}, nil }, nil
} }
@ -99,13 +99,17 @@ type secretVolume struct {
plugin *secretPlugin plugin *secretPlugin
mounter mount.Interface mounter mount.Interface
writer ioutil.Writer writer ioutil.Writer
volume.MetricsNil volume.MetricsProvider
} }
var _ volume.Volume = &secretVolume{} var _ volume.Volume = &secretVolume{}
func (sv *secretVolume) GetPath() string { func (sv *secretVolume) GetPath() string {
return sv.plugin.host.GetPodVolumeDir(sv.podUID, strings.EscapeQualifiedNameForDisk(secretPluginName), sv.volName) return getPathFromHost(sv.plugin.host, sv.podUID, sv.volName)
}
func getPathFromHost(host volume.VolumeHost, podUID types.UID, volName string) string {
return host.GetPodVolumeDir(podUID, strings.EscapeQualifiedNameForDisk(secretPluginName), volName)
} }
// secretVolumeBuilder handles retrieving secrets from the API server // secretVolumeBuilder handles retrieving secrets from the API server

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/testing/fake" "k8s.io/kubernetes/pkg/client/testing/fake"
@ -121,6 +122,11 @@ func TestPlugin(t *testing.T) {
} }
} }
doTestSecretDataInVolume(volumePath, secret, t) doTestSecretDataInVolume(volumePath, secret, t)
metrics, err := builder.GetMetrics()
assert.NotEmpty(t, metrics)
assert.NoError(t, err)
doTestCleanAndTeardown(plugin, testPodUID, testVolumeName, volumePath, t) doTestCleanAndTeardown(plugin, testPodUID, testVolumeName, volumePath, t)
} }

View File

@ -20,16 +20,18 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"strings" "strings"
"time" "time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io/ioutil"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"github.com/davecgh/go-spew/spew"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
) )
var _ = Describe("Kubelet", func() { var _ = Describe("Kubelet", func() {
@ -66,20 +68,16 @@ var _ = Describe("Kubelet", func() {
}) })
It("it should print the output to logs", func() { It("it should print the output to logs", func() {
errs := Retry(time.Minute, time.Second*4, func() error { Eventually(func() string {
rc, err := cl.Pods(api.NamespaceDefault).GetLogs("busybox", &api.PodLogOptions{}).Stream() rc, err := cl.Pods(api.NamespaceDefault).GetLogs("busybox", &api.PodLogOptions{}).Stream()
if err != nil { if err != nil {
return err return ""
} }
defer rc.Close() defer rc.Close()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
buf.ReadFrom(rc) buf.ReadFrom(rc)
if buf.String() != "'Hello World'\n" { return buf.String()
return fmt.Errorf("Expected %s to match 'Hello World'", buf.String()) }, time.Second*30, time.Second*4).Should(Equal("'Hello World'\n"))
}
return nil
})
Expect(errs).To(BeEmpty(), fmt.Sprintf("Failed to get Logs"))
}) })
It("it should be possible to delete", func() { It("it should be possible to delete", func() {
@ -101,9 +99,16 @@ var _ = Describe("Kubelet", func() {
createPod(cl, podName, []api.Container{ createPod(cl, podName, []api.Container{
{ {
Image: "gcr.io/google_containers/busybox", Image: "gcr.io/google_containers/busybox",
Command: []string{"sh", "-c", "echo 'Hello World' | tee ~/file | tee -a ~/file | tee /test-empty-dir | sleep 60"}, Command: []string{"sh", "-c", "echo 'Hello World' | tee ~/file | tee /test-empty-dir-mnt | sleep 60"},
Name: podName + containerSuffix, Name: podName + containerSuffix,
VolumeMounts: []api.VolumeMount{
{MountPath: "/test-empty-dir-mnt", Name: "test-empty-dir"},
},
}, },
}, []api.Volume{
// TODO: Test secret volumes
// TODO: Test hostpath volumes
{Name: "test-empty-dir", VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
}) })
} }
@ -113,6 +118,7 @@ var _ = Describe("Kubelet", func() {
Context("when querying /stats/summary", func() { Context("when querying /stats/summary", func() {
It("it should report resource usage through the stats api", func() { It("it should report resource usage through the stats api", func() {
By("Returning stats summary")
resp, err := http.Get(*kubeletAddress + "/stats/summary") resp, err := http.Get(*kubeletAddress + "/stats/summary")
now := time.Now() now := time.Now()
Expect(err).To(BeNil(), fmt.Sprintf("Failed to get /stats/summary")) Expect(err).To(BeNil(), fmt.Sprintf("Failed to get /stats/summary"))
@ -124,57 +130,84 @@ var _ = Describe("Kubelet", func() {
err = decoder.Decode(&summary) err = decoder.Decode(&summary)
Expect(err).To(BeNil(), fmt.Sprintf("Failed to parse /stats/summary to go struct: %+v", resp)) Expect(err).To(BeNil(), fmt.Sprintf("Failed to parse /stats/summary to go struct: %+v", resp))
// Verify Misc Stats By("Having the correct time")
Expect(summary.Time.Time).To(BeTemporally("~", now, 20*time.Second)) Expect(summary.Time.Time).To(BeTemporally("~", now, 20*time.Second))
// Verify Node Stats are present By("Having resources for node")
Expect(summary.Node.NodeName).To(Equal(*nodeName)) Expect(summary.Node.NodeName).To(Equal(*nodeName))
Expect(summary.Node.CPU.UsageCoreNanoSeconds).NotTo(BeZero()) Expect(summary.Node.CPU.UsageCoreNanoSeconds).NotTo(BeNil())
Expect(summary.Node.Memory.UsageBytes).NotTo(BeZero()) Expect(*summary.Node.CPU.UsageCoreNanoSeconds).NotTo(BeZero())
Expect(summary.Node.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(summary.Node.Fs.UsedBytes).NotTo(BeZero())
Expect(summary.Node.Fs.CapacityBytes).NotTo(BeZero())
Expect(summary.Node.Fs.AvailableBytes).NotTo(BeZero())
Expect(summary.Node.Memory.UsageBytes).NotTo(BeNil())
Expect(*summary.Node.Memory.UsageBytes).NotTo(BeZero())
Expect(summary.Node.Memory.WorkingSetBytes).NotTo(BeNil())
Expect(*summary.Node.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(summary.Node.Fs.AvailableBytes).NotTo(BeNil())
Expect(*summary.Node.Fs.AvailableBytes).NotTo(BeZero())
Expect(summary.Node.Fs.CapacityBytes).NotTo(BeNil())
Expect(*summary.Node.Fs.CapacityBytes).NotTo(BeZero())
Expect(summary.Node.Fs.UsedBytes).NotTo(BeNil())
Expect(*summary.Node.Fs.UsedBytes).NotTo(BeZero())
By("Having resources for kubelet and runtime system containers")
sysContainers := map[string]stats.ContainerStats{} sysContainers := map[string]stats.ContainerStats{}
sysContainersList := []string{} sysContainersList := []string{}
for _, container := range summary.Node.SystemContainers { for _, container := range summary.Node.SystemContainers {
sysContainers[container.Name] = container sysContainers[container.Name] = container
sysContainersList = append(sysContainersList, container.Name) sysContainersList = append(sysContainersList, container.Name)
Expect(container.CPU.UsageCoreNanoSeconds).NotTo(BeZero()) ExpectContainerStatsNotEmpty(&container)
// TODO: Test Network
Expect(container.Memory.UsageBytes).NotTo(BeZero())
Expect(container.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(container.Rootfs.CapacityBytes).NotTo(BeZero())
Expect(container.Rootfs.AvailableBytes).NotTo(BeZero())
Expect(container.Logs.CapacityBytes).NotTo(BeZero())
Expect(container.Logs.AvailableBytes).NotTo(BeZero())
} }
Expect(sysContainersList).To(ConsistOf("kubelet", "runtime")) Expect(sysContainersList).To(ConsistOf("kubelet", "runtime"))
// Verify Pods Stats are present // Verify Pods Stats are present
podsList := []string{} podsList := []string{}
By("Having resources for pods")
for _, pod := range summary.Pods { for _, pod := range summary.Pods {
if !strings.HasPrefix(pod.PodRef.Name, statsPrefix) { if !strings.HasPrefix(pod.PodRef.Name, statsPrefix) {
// Ignore pods created outside this test // Ignore pods created outside this test
continue continue
} }
// TODO: Test network
podsList = append(podsList, pod.PodRef.Name) podsList = append(podsList, pod.PodRef.Name)
Expect(pod.Containers).To(HaveLen(1)) Expect(pod.Containers).To(HaveLen(1))
container := pod.Containers[0] container := pod.Containers[0]
Expect(container.Name).To(Equal(pod.PodRef.Name + containerSuffix)) Expect(container.Name).To(Equal(pod.PodRef.Name + containerSuffix))
Expect(container.CPU.UsageCoreNanoSeconds).NotTo(BeZero())
Expect(container.Memory.UsageBytes).NotTo(BeZero()) ExpectContainerStatsNotEmpty(&container)
Expect(container.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(container.Rootfs.CapacityBytes).NotTo(BeZero()) // emptydir volume
Expect(container.Rootfs.AvailableBytes).NotTo(BeZero()) volumeNames := []string{}
Expect(*container.Rootfs.UsedBytes).NotTo(BeZero(), contents) for _, vs := range pod.VolumeStats {
Expect(container.Logs.CapacityBytes).NotTo(BeZero()) Expect(vs.CapacityBytes).NotTo(BeZero())
Expect(container.Logs.AvailableBytes).NotTo(BeZero()) Expect(vs.AvailableBytes).NotTo(BeZero())
Expect(*container.Logs.UsedBytes).NotTo(BeZero(), contents) Expect(vs.UsedBytes).NotTo(BeZero())
if strings.HasPrefix(vs.Name, "default-token-") {
volumeNames = append(volumeNames, "default-token-")
} else {
volumeNames = append(volumeNames, vs.Name)
}
}
Expect(volumeNames).To(ConsistOf("default-token-", "test-empty-dir"))
// fs usage (not for system containers)
Expect(container.Rootfs).NotTo(BeNil(), spew.Sdump(container))
Expect(container.Rootfs.AvailableBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Rootfs.AvailableBytes).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Rootfs.CapacityBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Rootfs.CapacityBytes).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Rootfs.UsedBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Rootfs.UsedBytes).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Logs).NotTo(BeNil(), spew.Sdump(container))
Expect(container.Logs.AvailableBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Logs.AvailableBytes).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Logs.CapacityBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Logs.CapacityBytes).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Logs.UsedBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Logs.UsedBytes).NotTo(BeZero(), spew.Sdump(container))
} }
Expect(podsList).To(ConsistOf(podNames)) Expect(podsList).To(ConsistOf(podNames))
}) })
@ -189,11 +222,25 @@ var _ = Describe("Kubelet", func() {
}) })
}) })
func ExpectContainerStatsNotEmpty(container *stats.ContainerStats) {
// TODO: Test Network
Expect(container.CPU).NotTo(BeNil(), spew.Sdump(container))
Expect(container.CPU.UsageCoreNanoSeconds).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.CPU.UsageCoreNanoSeconds).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Memory).NotTo(BeNil(), spew.Sdump(container))
Expect(container.Memory.UsageBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Memory.UsageBytes).NotTo(BeZero(), spew.Sdump(container))
Expect(container.Memory.WorkingSetBytes).NotTo(BeNil(), spew.Sdump(container))
Expect(*container.Memory.WorkingSetBytes).NotTo(BeZero(), spew.Sdump(container))
}
const ( const (
containerSuffix = "-c" containerSuffix = "-c"
) )
func createPod(cl *client.Client, podName string, containers []api.Container) { func createPod(cl *client.Client, podName string, containers []api.Container, volumes []api.Volume) {
pod := &api.Pod{ pod := &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: podName, Name: podName,
@ -205,6 +252,7 @@ func createPod(cl *client.Client, podName string, containers []api.Container) {
// Don't restart the Pod since it is expected to exit // Don't restart the Pod since it is expected to exit
RestartPolicy: api.RestartPolicyNever, RestartPolicy: api.RestartPolicyNever,
Containers: containers, Containers: containers,
Volumes: volumes,
}, },
} }
_, err := cl.Pods(api.NamespaceDefault).Create(pod) _, err := cl.Pods(api.NamespaceDefault).Create(pod)

View File

@ -1,48 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"time"
)
// RetryFn represents a retryable test condition. It returns an error if the condition is not met
// otherwise returns nil for success.
type RetryFn func() error
// Retry retries the RetryFn for a maximum of maxWait time. The wait duration is waited between
// retries. If the success condition is not met in maxWait time, the list of encountered errors
// is returned. If successful returns an empty list.
// Example:
// Expect(Retry(time.Minute*1, time.Second*2, func() error {
// if success {
// return nil
// } else {
// return errors.New("Failed")
// }
// }).To(BeNil(), fmt.Sprintf("Failed"))
func Retry(maxWait time.Duration, wait time.Duration, retry RetryFn) []error {
errs := []error{}
for start := time.Now(); time.Now().Before(start.Add(maxWait)); {
if err := retry(); err != nil {
errs = append(errs, err)
} else {
return []error{}
}
}
return errs
}